Merge pull request #990 from evrardjp/first_refactors

Cleanup Part 1 - First refactors
This commit is contained in:
Jean-Philippe Evrard
2024-10-27 21:45:00 +01:00
committed by GitHub
19 changed files with 1103 additions and 1005 deletions

View File

@@ -8,37 +8,32 @@ import (
"net/http"
"net/url"
"os"
"os/exec"
"reflect"
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/containrrr/shoutrrr"
"github.com/kubereboot/kured/internal"
"github.com/kubereboot/kured/pkg/blockers"
"github.com/kubereboot/kured/pkg/checkers"
"github.com/kubereboot/kured/pkg/daemonsetlock"
"github.com/kubereboot/kured/pkg/delaytick"
"github.com/kubereboot/kured/pkg/reboot"
"github.com/kubereboot/kured/pkg/taints"
"github.com/kubereboot/kured/pkg/timewindow"
papi "github.com/prometheus/client_golang/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
flag "github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
kubectldrain "k8s.io/kubectl/pkg/drain"
"github.com/google/shlex"
shoutrrr "github.com/containrrr/shoutrrr"
"github.com/kubereboot/kured/pkg/alerts"
"github.com/kubereboot/kured/pkg/daemonsetlock"
"github.com/kubereboot/kured/pkg/delaytick"
"github.com/kubereboot/kured/pkg/reboot"
"github.com/kubereboot/kured/pkg/taints"
"github.com/kubereboot/kured/pkg/timewindow"
"github.com/kubereboot/kured/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
@@ -63,7 +58,7 @@ var (
lockReleaseDelay time.Duration
prometheusURL string
preferNoScheduleTaintName string
alertFilter *regexp.Regexp
alertFilter regexpValue
alertFilterMatchOnly bool
alertFiringOnly bool
rebootSentinelFile string
@@ -108,11 +103,6 @@ const (
// EnvPrefix The environment variable prefix of all environment variables bound to our command line flags.
EnvPrefix = "KURED"
// MethodCommand is used as "--reboot-method" value when rebooting with the configured "--reboot-command"
MethodCommand = "command"
// MethodSignal is used as "--reboot-method" value when rebooting with a SIGRTMIN+5 signal.
MethodSignal = "signal"
sigTrminPlus5 = 34 + 5
)
@@ -121,138 +111,178 @@ func init() {
}
func main() {
cmd := NewRootCommand()
if err := cmd.Execute(); err != nil {
log.Fatal(err)
}
}
// NewRootCommand construct the Cobra root command
func NewRootCommand() *cobra.Command {
rootCmd := &cobra.Command{
Use: "kured",
Short: "Kubernetes Reboot Daemon",
PersistentPreRunE: bindViper,
PreRun: flagCheck,
Run: root}
rootCmd.PersistentFlags().StringVar(&nodeID, "node-id", "",
flag.StringVar(&nodeID, "node-id", "",
"node name kured runs on, should be passed down from spec.nodeName via KURED_NODE_ID environment variable")
rootCmd.PersistentFlags().BoolVar(&forceReboot, "force-reboot", false,
flag.BoolVar(&forceReboot, "force-reboot", false,
"force a reboot even if the drain fails or times out")
rootCmd.PersistentFlags().StringVar(&metricsHost, "metrics-host", "",
flag.StringVar(&metricsHost, "metrics-host", "",
"host where metrics will listen")
rootCmd.PersistentFlags().IntVar(&metricsPort, "metrics-port", 8080,
flag.IntVar(&metricsPort, "metrics-port", 8080,
"port number where metrics will listen")
rootCmd.PersistentFlags().IntVar(&drainGracePeriod, "drain-grace-period", -1,
flag.IntVar(&drainGracePeriod, "drain-grace-period", -1,
"time in seconds given to each pod to terminate gracefully, if negative, the default value specified in the pod will be used")
rootCmd.PersistentFlags().StringVar(&drainPodSelector, "drain-pod-selector", "",
flag.StringVar(&drainPodSelector, "drain-pod-selector", "",
"only drain pods with labels matching the selector (default: '', all pods)")
rootCmd.PersistentFlags().IntVar(&skipWaitForDeleteTimeoutSeconds, "skip-wait-for-delete-timeout", 0,
flag.IntVar(&skipWaitForDeleteTimeoutSeconds, "skip-wait-for-delete-timeout", 0,
"when seconds is greater than zero, skip waiting for the pods whose deletion timestamp is older than N seconds while draining a node")
rootCmd.PersistentFlags().DurationVar(&drainDelay, "drain-delay", 0,
flag.DurationVar(&drainDelay, "drain-delay", 0,
"delay drain for this duration (default: 0, disabled)")
rootCmd.PersistentFlags().DurationVar(&drainTimeout, "drain-timeout", 0,
flag.DurationVar(&drainTimeout, "drain-timeout", 0,
"timeout after which the drain is aborted (default: 0, infinite time)")
rootCmd.PersistentFlags().DurationVar(&rebootDelay, "reboot-delay", 0,
flag.DurationVar(&rebootDelay, "reboot-delay", 0,
"delay reboot for this duration (default: 0, disabled)")
rootCmd.PersistentFlags().StringVar(&rebootMethod, "reboot-method", "command",
flag.StringVar(&rebootMethod, "reboot-method", "command",
"method to use for reboots. Available: command")
rootCmd.PersistentFlags().DurationVar(&period, "period", time.Minute*60,
flag.DurationVar(&period, "period", time.Minute*60,
"sentinel check period")
rootCmd.PersistentFlags().StringVar(&dsNamespace, "ds-namespace", "kube-system",
flag.StringVar(&dsNamespace, "ds-namespace", "kube-system",
"namespace containing daemonset on which to place lock")
rootCmd.PersistentFlags().StringVar(&dsName, "ds-name", "kured",
flag.StringVar(&dsName, "ds-name", "kured",
"name of daemonset on which to place lock")
rootCmd.PersistentFlags().StringVar(&lockAnnotation, "lock-annotation", KuredNodeLockAnnotation,
flag.StringVar(&lockAnnotation, "lock-annotation", KuredNodeLockAnnotation,
"annotation in which to record locking node")
rootCmd.PersistentFlags().DurationVar(&lockTTL, "lock-ttl", 0,
flag.DurationVar(&lockTTL, "lock-ttl", 0,
"expire lock annotation after this duration (default: 0, disabled)")
rootCmd.PersistentFlags().DurationVar(&lockReleaseDelay, "lock-release-delay", 0,
flag.DurationVar(&lockReleaseDelay, "lock-release-delay", 0,
"delay lock release for this duration (default: 0, disabled)")
rootCmd.PersistentFlags().StringVar(&prometheusURL, "prometheus-url", "",
flag.StringVar(&prometheusURL, "prometheus-url", "",
"Prometheus instance to probe for active alerts")
rootCmd.PersistentFlags().Var(&regexpValue{&alertFilter}, "alert-filter-regexp",
flag.Var(&alertFilter, "alert-filter-regexp",
"alert names to ignore when checking for active alerts")
rootCmd.PersistentFlags().BoolVar(&alertFilterMatchOnly, "alert-filter-match-only", false,
flag.BoolVar(&alertFilterMatchOnly, "alert-filter-match-only", false,
"Only block if the alert-filter-regexp matches active alerts")
rootCmd.PersistentFlags().BoolVar(&alertFiringOnly, "alert-firing-only", false,
flag.BoolVar(&alertFiringOnly, "alert-firing-only", false,
"only consider firing alerts when checking for active alerts")
rootCmd.PersistentFlags().StringVar(&rebootSentinelFile, "reboot-sentinel", "/var/run/reboot-required",
flag.StringVar(&rebootSentinelFile, "reboot-sentinel", "/var/run/reboot-required",
"path to file whose existence triggers the reboot command")
rootCmd.PersistentFlags().StringVar(&preferNoScheduleTaintName, "prefer-no-schedule-taint", "",
flag.StringVar(&preferNoScheduleTaintName, "prefer-no-schedule-taint", "",
"Taint name applied during pending node reboot (to prevent receiving additional pods from other rebooting nodes). Disabled by default. Set e.g. to \"weave.works/kured-node-reboot\" to enable tainting.")
rootCmd.PersistentFlags().StringVar(&rebootSentinelCommand, "reboot-sentinel-command", "",
flag.StringVar(&rebootSentinelCommand, "reboot-sentinel-command", "",
"command for which a zero return code will trigger a reboot command")
rootCmd.PersistentFlags().StringVar(&rebootCommand, "reboot-command", "/bin/systemctl reboot",
flag.StringVar(&rebootCommand, "reboot-command", "/bin/systemctl reboot",
"command to run when a reboot is required")
rootCmd.PersistentFlags().IntVar(&concurrency, "concurrency", 1,
flag.IntVar(&concurrency, "concurrency", 1,
"amount of nodes to concurrently reboot. Defaults to 1")
rootCmd.PersistentFlags().IntVar(&rebootSignal, "reboot-signal", sigTrminPlus5,
flag.IntVar(&rebootSignal, "reboot-signal", sigTrminPlus5,
"signal to use for reboot, SIGRTMIN+5 by default.")
rootCmd.PersistentFlags().StringVar(&slackHookURL, "slack-hook-url", "",
flag.StringVar(&slackHookURL, "slack-hook-url", "",
"slack hook URL for reboot notifications [deprecated in favor of --notify-url]")
rootCmd.PersistentFlags().StringVar(&slackUsername, "slack-username", "kured",
flag.StringVar(&slackUsername, "slack-username", "kured",
"slack username for reboot notifications")
rootCmd.PersistentFlags().StringVar(&slackChannel, "slack-channel", "",
flag.StringVar(&slackChannel, "slack-channel", "",
"slack channel for reboot notifications")
rootCmd.PersistentFlags().StringVar(&notifyURL, "notify-url", "",
flag.StringVar(&notifyURL, "notify-url", "",
"notify URL for reboot notifications (cannot use with --slack-hook-url flags)")
rootCmd.PersistentFlags().StringVar(&messageTemplateUncordon, "message-template-uncordon", "Node %s rebooted & uncordoned successfully!",
flag.StringVar(&messageTemplateUncordon, "message-template-uncordon", "Node %s rebooted & uncordoned successfully!",
"message template used to notify about a node being successfully uncordoned")
rootCmd.PersistentFlags().StringVar(&messageTemplateDrain, "message-template-drain", "Draining node %s",
flag.StringVar(&messageTemplateDrain, "message-template-drain", "Draining node %s",
"message template used to notify about a node being drained")
rootCmd.PersistentFlags().StringVar(&messageTemplateReboot, "message-template-reboot", "Rebooting node %s",
flag.StringVar(&messageTemplateReboot, "message-template-reboot", "Rebooting node %s",
"message template used to notify about a node being rebooted")
rootCmd.PersistentFlags().StringArrayVar(&podSelectors, "blocking-pod-selector", nil,
flag.StringArrayVar(&podSelectors, "blocking-pod-selector", nil,
"label selector identifying pods whose presence should prevent reboots")
rootCmd.PersistentFlags().StringSliceVar(&rebootDays, "reboot-days", timewindow.EveryDay,
flag.StringSliceVar(&rebootDays, "reboot-days", timewindow.EveryDay,
"schedule reboot on these days")
rootCmd.PersistentFlags().StringVar(&rebootStart, "start-time", "0:00",
flag.StringVar(&rebootStart, "start-time", "0:00",
"schedule reboot only after this time of day")
rootCmd.PersistentFlags().StringVar(&rebootEnd, "end-time", "23:59:59",
flag.StringVar(&rebootEnd, "end-time", "23:59:59",
"schedule reboot only before this time of day")
rootCmd.PersistentFlags().StringVar(&timezone, "time-zone", "UTC",
flag.StringVar(&timezone, "time-zone", "UTC",
"use this timezone for schedule inputs")
rootCmd.PersistentFlags().BoolVar(&annotateNodes, "annotate-nodes", false,
flag.BoolVar(&annotateNodes, "annotate-nodes", false,
"if set, the annotations 'weave.works/kured-reboot-in-progress' and 'weave.works/kured-most-recent-reboot-needed' will be given to nodes undergoing kured reboots")
rootCmd.PersistentFlags().StringVar(&logFormat, "log-format", "text",
flag.StringVar(&logFormat, "log-format", "text",
"use text or json log format")
rootCmd.PersistentFlags().StringSliceVar(&preRebootNodeLabels, "pre-reboot-node-labels", nil,
flag.StringSliceVar(&preRebootNodeLabels, "pre-reboot-node-labels", nil,
"labels to add to nodes before cordoning")
rootCmd.PersistentFlags().StringSliceVar(&postRebootNodeLabels, "post-reboot-node-labels", nil,
flag.StringSliceVar(&postRebootNodeLabels, "post-reboot-node-labels", nil,
"labels to add to nodes after uncordoning")
return rootCmd
flag.Parse()
// Load flags from environment variables
LoadFromEnv()
log.Infof("Kubernetes Reboot Daemon: %s", version)
if logFormat == "json" {
log.SetFormatter(&log.JSONFormatter{})
}
if nodeID == "" {
log.Fatal("KURED_NODE_ID environment variable required")
}
log.Infof("Node ID: %s", nodeID)
notifyURL = validateNotificationURL(notifyURL, slackHookURL)
err := validateNodeLabels(preRebootNodeLabels, postRebootNodeLabels)
if err != nil {
log.Warnf(err.Error())
}
log.Infof("PreferNoSchedule taint: %s", preferNoScheduleTaintName)
// This should be printed from blocker list instead of only blocking pod selectors
log.Infof("Blocking Pod Selectors: %v", podSelectors)
log.Infof("Reboot period %v", period)
log.Infof("Concurrency: %v", concurrency)
if annotateNodes {
log.Infof("Will annotate nodes during kured reboot operations")
}
// Now call the rest of the main loop.
window, err := timewindow.New(rebootDays, rebootStart, rebootEnd, timezone)
if err != nil {
log.Fatalf("Failed to build time window: %v", err)
}
log.Infof("Reboot schedule: %v", window)
log.Infof("Reboot method: %s", rebootMethod)
rebooter, err := internal.NewRebooter(rebootMethod, rebootCommand, rebootSignal)
if err != nil {
log.Fatalf("Failed to build rebooter: %v", err)
}
rebootChecker, err := internal.NewRebootChecker(rebootSentinelCommand, rebootSentinelFile)
if err != nil {
log.Fatalf("Failed to build reboot checker: %v", err)
}
config, err := rest.InClusterConfig()
if err != nil {
log.Fatal(err)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation)
if lockTTL > 0 {
log.Infof("Lock TTL set, lock will expire after: %v", lockTTL)
} else {
log.Info("Lock TTL not set, lock will remain until being released")
}
if lockReleaseDelay > 0 {
log.Infof("Lock release delay set, lock release will be delayed by: %v", lockReleaseDelay)
} else {
log.Info("Lock release delay not set, lock will be released immediately after rebooting")
}
lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation, lockTTL, concurrency, lockReleaseDelay)
go rebootAsRequired(nodeID, rebooter, rebootChecker, window, lock, client)
go maintainRebootRequiredMetric(nodeID, rebootChecker)
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(fmt.Sprintf("%s:%d", metricsHost, metricsPort), nil))
}
// func that checks for deprecated slack-notification-related flags and node labels that do not match
func flagCheck(cmd *cobra.Command, args []string) {
if slackHookURL != "" && notifyURL != "" {
log.Warnf("Cannot use both --notify-url and --slack-hook-url flags. Kured will use --notify-url flag only...")
}
if notifyURL != "" {
notifyURL = stripQuotes(notifyURL)
} else if slackHookURL != "" {
slackHookURL = stripQuotes(slackHookURL)
log.Warnf("Deprecated flag(s). Please use --notify-url flag instead.")
trataURL, err := url.Parse(slackHookURL)
if err != nil {
log.Warnf("slack-hook-url is not properly formatted... no notification will be sent: %v\n", err)
}
if len(strings.Split(strings.Trim(trataURL.Path, "/services/"), "/")) != 3 {
log.Warnf("slack-hook-url is not properly formatted... no notification will be sent: unexpected number of / in URL\n")
} else {
notifyURL = fmt.Sprintf("slack://%s", strings.Trim(trataURL.Path, "/services/"))
}
}
func validateNodeLabels(preRebootNodeLabels []string, postRebootNodeLabels []string) error {
var preRebootNodeLabelKeys, postRebootNodeLabelKeys []string
for _, label := range preRebootNodeLabels {
preRebootNodeLabelKeys = append(preRebootNodeLabelKeys, strings.Split(label, "=")[0])
@@ -263,8 +293,95 @@ func flagCheck(cmd *cobra.Command, args []string) {
sort.Strings(preRebootNodeLabelKeys)
sort.Strings(postRebootNodeLabelKeys)
if !reflect.DeepEqual(preRebootNodeLabelKeys, postRebootNodeLabelKeys) {
log.Warnf("pre-reboot-node-labels keys and post-reboot-node-labels keys do not match. This may result in unexpected behaviour.")
return fmt.Errorf("pre-reboot-node-labels keys and post-reboot-node-labels keys do not match, resulting in unexpected behaviour")
}
return nil
}
func validateNotificationURL(notifyURL string, slackHookURL string) string {
switch {
case slackHookURL != "" && notifyURL != "":
log.Warnf("Cannot use both --notify-url (given: %v) and --slack-hook-url (given: %v) flags. Kured will only use --notify-url flag", slackHookURL, notifyURL)
return validateNotificationURL(notifyURL, "")
case notifyURL != "":
return stripQuotes(notifyURL)
case slackHookURL != "":
log.Warnf("Deprecated flag(s). Please use --notify-url flag instead.")
parsedURL, err := url.Parse(stripQuotes(slackHookURL))
if err != nil {
log.Errorf("slack-hook-url is not properly formatted... no notification will be sent: %v\n", err)
return ""
}
if len(strings.Split(strings.Trim(parsedURL.Path, "/services/"), "/")) != 3 {
log.Errorf("slack-hook-url is not properly formatted... no notification will be sent: unexpected number of / in URL\n")
return ""
}
return fmt.Sprintf("slack://%s", strings.Trim(parsedURL.Path, "/services/"))
}
return ""
}
// LoadFromEnv attempts to load environment variables corresponding to flags.
// It looks for an environment variable with the uppercase version of the flag name (prefixed by EnvPrefix).
func LoadFromEnv() {
flag.VisitAll(func(f *flag.Flag) {
envVarName := fmt.Sprintf("%s_%s", EnvPrefix, strings.ToUpper(strings.ReplaceAll(f.Name, "-", "_")))
if envValue, exists := os.LookupEnv(envVarName); exists {
switch f.Value.Type() {
case "int":
if parsedVal, err := strconv.Atoi(envValue); err == nil {
err := flag.Set(f.Name, strconv.Itoa(parsedVal))
if err != nil {
fmt.Printf("cannot set flag %s from env var named %s", f.Name, envVarName)
os.Exit(1)
} // Set int flag
} else {
fmt.Printf("Invalid value for env var named %s", envVarName)
os.Exit(1)
}
case "string":
err := flag.Set(f.Name, envValue)
if err != nil {
fmt.Printf("cannot set flag %s from env{%s}: %s\n", f.Name, envVarName, envValue)
os.Exit(1)
} // Set string flag
case "bool":
if parsedVal, err := strconv.ParseBool(envValue); err == nil {
err := flag.Set(f.Name, strconv.FormatBool(parsedVal))
if err != nil {
fmt.Printf("cannot set flag %s from env{%s}: %s\n", f.Name, envVarName, envValue)
os.Exit(1)
} // Set boolean flag
} else {
fmt.Printf("Invalid value for %s: %s\n", envVarName, envValue)
os.Exit(1)
}
case "duration":
// Set duration from the environment variable (e.g., "1h30m")
if _, err := time.ParseDuration(envValue); err == nil {
flag.Set(f.Name, envValue)
} else {
fmt.Printf("Invalid duration for %s: %s\n", envVarName, envValue)
os.Exit(1)
}
case "regexp":
// For regexp, set it from the environment variable
flag.Set(f.Name, envValue)
case "stringSlice":
// For stringSlice, split the environment variable by commas and set it
err := flag.Set(f.Name, envValue)
if err != nil {
fmt.Printf("cannot set flag %s from env{%s}: %s\n", f.Name, envVarName, envValue)
os.Exit(1)
}
default:
fmt.Printf("Unsupported flag type for %s\n", f.Name)
}
}
})
}
// stripQuotes removes any literal single or double quote chars that surround a string
@@ -280,218 +397,6 @@ func stripQuotes(str string) string {
return str
}
// bindViper initializes viper and binds command flags with environment variables
func bindViper(cmd *cobra.Command, args []string) error {
v := viper.New()
v.SetEnvPrefix(EnvPrefix)
v.AutomaticEnv()
bindFlags(cmd, v)
return nil
}
// bindFlags binds each cobra flag to its associated viper configuration (environment variable)
func bindFlags(cmd *cobra.Command, v *viper.Viper) {
cmd.Flags().VisitAll(func(f *pflag.Flag) {
// Environment variables can't have dashes in them, so bind them to their equivalent keys with underscores
if strings.Contains(f.Name, "-") {
v.BindEnv(f.Name, flagToEnvVar(f.Name))
}
// Apply the viper config value to the flag when the flag is not set and viper has a value
if !f.Changed && v.IsSet(f.Name) {
val := v.Get(f.Name)
log.Infof("Binding %s command flag to environment variable: %s", f.Name, flagToEnvVar(f.Name))
cmd.Flags().Set(f.Name, fmt.Sprintf("%v", val))
}
})
}
// flagToEnvVar converts command flag name to equivalent environment variable name
func flagToEnvVar(flag string) string {
envVarSuffix := strings.ToUpper(strings.ReplaceAll(flag, "-", "_"))
return fmt.Sprintf("%s_%s", EnvPrefix, envVarSuffix)
}
// buildHostCommand writes a new command to run in the host namespace
// Rancher based need different pid
func buildHostCommand(pid int, command []string) []string {
// From the container, we nsenter into the proper PID to run the hostCommand.
// For this, kured daemonset need to be configured with hostPID:true and privileged:true
cmd := []string{"/usr/bin/nsenter", fmt.Sprintf("-m/proc/%d/ns/mnt", pid), "--"}
cmd = append(cmd, command...)
return cmd
}
func rebootRequired(sentinelCommand []string) bool {
cmd := util.NewCommand(sentinelCommand[0], sentinelCommand[1:]...)
if err := cmd.Run(); err != nil {
switch err := err.(type) {
case *exec.ExitError:
// We assume a non-zero exit code means 'reboot not required', but of course
// the user could have misconfigured the sentinel command or something else
// went wrong during its execution. In that case, not entering a reboot loop
// is the right thing to do, and we are logging stdout/stderr of the command
// so it should be obvious what is wrong.
if cmd.ProcessState.ExitCode() != 1 {
log.Warnf("sentinel command ended with unexpected exit code: %v", cmd.ProcessState.ExitCode())
}
return false
default:
// Something was grossly misconfigured, such as the command path being wrong.
log.Fatalf("Error invoking sentinel command: %v", err)
}
}
return true
}
// RebootBlocker interface should be implemented by types
// to know if their instantiations should block a reboot
type RebootBlocker interface {
isBlocked() bool
}
// PrometheusBlockingChecker contains info for connecting
// to prometheus, and can give info about whether a reboot should be blocked
type PrometheusBlockingChecker struct {
// prometheusClient to make prometheus-go-client and api config available
// into the PrometheusBlockingChecker struct
promClient *alerts.PromClient
// regexp used to get alerts
filter *regexp.Regexp
// bool to indicate if only firing alerts should be considered
firingOnly bool
// bool to indicate that we're only blocking on alerts which match the filter
filterMatchOnly bool
}
// KubernetesBlockingChecker contains info for connecting
// to k8s, and can give info about whether a reboot should be blocked
type KubernetesBlockingChecker struct {
// client used to contact kubernetes API
client *kubernetes.Clientset
nodename string
// lised used to filter pods (podSelector)
filter []string
}
func (pb PrometheusBlockingChecker) isBlocked() bool {
alertNames, err := pb.promClient.ActiveAlerts(pb.filter, pb.firingOnly, pb.filterMatchOnly)
if err != nil {
log.Warnf("Reboot blocked: prometheus query error: %v", err)
return true
}
count := len(alertNames)
if count > 10 {
alertNames = append(alertNames[:10], "...")
}
if count > 0 {
log.Warnf("Reboot blocked: %d active alerts: %v", count, alertNames)
return true
}
return false
}
func (kb KubernetesBlockingChecker) isBlocked() bool {
fieldSelector := fmt.Sprintf("spec.nodeName=%s,status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown", kb.nodename)
for _, labelSelector := range kb.filter {
podList, err := kb.client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
FieldSelector: fieldSelector,
Limit: 10})
if err != nil {
log.Warnf("Reboot blocked: pod query error: %v", err)
return true
}
if len(podList.Items) > 0 {
podNames := make([]string, 0, len(podList.Items))
for _, pod := range podList.Items {
podNames = append(podNames, pod.Name)
}
if len(podList.Continue) > 0 {
podNames = append(podNames, "...")
}
log.Warnf("Reboot blocked: matching pods: %v", podNames)
return true
}
}
return false
}
func rebootBlocked(blockers ...RebootBlocker) bool {
for _, blocker := range blockers {
if blocker.isBlocked() {
return true
}
}
return false
}
func holding(lock *daemonsetlock.DaemonSetLock, metadata interface{}, isMultiLock bool) bool {
var holding bool
var err error
if isMultiLock {
holding, err = lock.TestMultiple()
} else {
holding, err = lock.Test(metadata)
}
if err != nil {
log.Fatalf("Error testing lock: %v", err)
}
if holding {
log.Infof("Holding lock")
}
return holding
}
func acquire(lock *daemonsetlock.DaemonSetLock, metadata interface{}, TTL time.Duration, maxOwners int) bool {
var holding bool
var holder string
var err error
if maxOwners > 1 {
var holders []string
holding, holders, err = lock.AcquireMultiple(metadata, TTL, maxOwners)
holder = strings.Join(holders, ",")
} else {
holding, holder, err = lock.Acquire(metadata, TTL)
}
switch {
case err != nil:
log.Fatalf("Error acquiring lock: %v", err)
return false
case !holding:
log.Warnf("Lock already held: %v", holder)
return false
default:
log.Infof("Acquired reboot lock")
return true
}
}
func throttle(releaseDelay time.Duration) {
if releaseDelay > 0 {
log.Infof("Delaying lock release by %v", releaseDelay)
time.Sleep(releaseDelay)
}
}
func release(lock *daemonsetlock.DaemonSetLock, isMultiLock bool) {
log.Infof("Releasing lock")
var err error
if isMultiLock {
err = lock.ReleaseMultiple()
} else {
err = lock.Release()
}
if err != nil {
log.Fatalf("Error releasing lock: %v", err)
}
}
func drain(client *kubernetes.Clientset, node *v1.Node) error {
nodename := node.GetName()
@@ -556,9 +461,9 @@ func uncordon(client *kubernetes.Clientset, node *v1.Node) error {
return nil
}
func maintainRebootRequiredMetric(nodeID string, sentinelCommand []string) {
func maintainRebootRequiredMetric(nodeID string, checker checkers.Checker) {
for {
if rebootRequired(sentinelCommand) {
if checker.RebootRequired() {
rebootRequiredGauge.WithLabelValues(nodeID).Set(1)
} else {
rebootRequiredGauge.WithLabelValues(nodeID).Set(0)
@@ -567,11 +472,6 @@ func maintainRebootRequiredMetric(nodeID string, sentinelCommand []string) {
}
}
// nodeMeta is used to remember information across reboots
type nodeMeta struct {
Unschedulable bool `json:"unschedulable"`
}
func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) error {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
@@ -646,30 +546,23 @@ func updateNodeLabels(client *kubernetes.Clientset, node *v1.Node, labels []stri
}
}
func rebootAsRequired(nodeID string, booter reboot.Reboot, sentinelCommand []string, window *timewindow.TimeWindow, TTL time.Duration, releaseDelay time.Duration) {
config, err := rest.InClusterConfig()
if err != nil {
log.Fatal(err)
}
func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, window *timewindow.TimeWindow, lock daemonsetlock.Lock, client *kubernetes.Clientset) {
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation)
nodeMeta := nodeMeta{}
source := rand.NewSource(time.Now().UnixNano())
tick := delaytick.New(source, 1*time.Minute)
for range tick {
if holding(lock, &nodeMeta, concurrency > 1) {
holding, lockData, err := lock.Holding()
if err != nil {
log.Errorf("Error testing lock: %v", err)
}
if holding {
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
log.Errorf("Error retrieving node object via k8s API: %v", err)
continue
}
if !nodeMeta.Unschedulable {
if !lockData.Metadata.Unschedulable {
err = uncordon(client, node)
if err != nil {
log.Errorf("Unable to uncordon %s: %v, will continue to hold lock and retry uncordon", node.GetName(), err)
@@ -687,7 +580,7 @@ func rebootAsRequired(nodeID string, booter reboot.Reboot, sentinelCommand []str
// And (2) check if we previously annotated the node that it was in the process of being rebooted,
// And finally (3) if it has that annotation, to delete it.
// This indicates to other node tools running on the cluster that this node may be a candidate for maintenance
if annotateNodes && !rebootRequired(sentinelCommand) {
if annotateNodes && !checker.RebootRequired() {
if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; ok {
err := deleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation)
if err != nil {
@@ -695,8 +588,12 @@ func rebootAsRequired(nodeID string, booter reboot.Reboot, sentinelCommand []str
}
}
}
throttle(releaseDelay)
release(lock, concurrency > 1)
err = lock.Release()
if err != nil {
log.Errorf("Error releasing lock, will retry: %v", err)
continue
}
break
} else {
break
@@ -706,16 +603,10 @@ func rebootAsRequired(nodeID string, booter reboot.Reboot, sentinelCommand []str
preferNoScheduleTaint := taints.New(client, nodeID, preferNoScheduleTaintName, v1.TaintEffectPreferNoSchedule)
// Remove taint immediately during startup to quickly allow scheduling again.
if !rebootRequired(sentinelCommand) {
if !checker.RebootRequired() {
preferNoScheduleTaint.Disable()
}
// instantiate prometheus client
promClient, err := alerts.NewPromClient(papi.Config{Address: prometheusURL})
if err != nil {
log.Fatal("Unable to create prometheus client: ", err)
}
source = rand.NewSource(time.Now().UnixNano())
tick = delaytick.New(source, period)
for range tick {
@@ -725,7 +616,7 @@ func rebootAsRequired(nodeID string, booter reboot.Reboot, sentinelCommand []str
continue
}
if !rebootRequired(sentinelCommand) {
if !checker.RebootRequired() {
log.Infof("Reboot not required")
preferNoScheduleTaint.Disable()
continue
@@ -735,7 +626,8 @@ func rebootAsRequired(nodeID string, booter reboot.Reboot, sentinelCommand []str
if err != nil {
log.Fatalf("Error retrieving node object via k8s API: %v", err)
}
nodeMeta.Unschedulable = node.Spec.Unschedulable
nodeMeta := daemonsetlock.NodeMeta{Unschedulable: node.Spec.Unschedulable}
var timeNowString string
if annotateNodes {
@@ -753,32 +645,47 @@ func rebootAsRequired(nodeID string, booter reboot.Reboot, sentinelCommand []str
}
}
var blockCheckers []RebootBlocker
var blockCheckers []blockers.RebootBlocker
if prometheusURL != "" {
blockCheckers = append(blockCheckers, PrometheusBlockingChecker{promClient: promClient, filter: alertFilter, firingOnly: alertFiringOnly, filterMatchOnly: alertFilterMatchOnly})
blockCheckers = append(blockCheckers, blockers.NewPrometheusBlockingChecker(papi.Config{Address: prometheusURL}, alertFilter.Regexp, alertFiringOnly, alertFilterMatchOnly))
}
if podSelectors != nil {
blockCheckers = append(blockCheckers, KubernetesBlockingChecker{client: client, nodename: nodeID, filter: podSelectors})
blockCheckers = append(blockCheckers, blockers.NewKubernetesBlockingChecker(client, nodeID, podSelectors))
}
var rebootRequiredBlockCondition string
if rebootBlocked(blockCheckers...) {
if blockers.RebootBlocked(blockCheckers...) {
rebootRequiredBlockCondition = ", but blocked at this time"
continue
}
log.Infof("Reboot required%s", rebootRequiredBlockCondition)
if !holding(lock, &nodeMeta, concurrency > 1) && !acquire(lock, &nodeMeta, TTL, concurrency) {
// Prefer to not schedule pods onto this node to avoid draing the same pod multiple times.
preferNoScheduleTaint.Enable()
continue
holding, _, err := lock.Holding()
if err != nil {
log.Errorf("Error testing lock: %v", err)
}
if !holding {
acquired, holder, err := lock.Acquire(nodeMeta)
if err != nil {
log.Errorf("Error acquiring lock: %v", err)
}
if !acquired {
log.Warnf("Lock already held: %v", holder)
// Prefer to not schedule pods onto this node to avoid draing the same pod multiple times.
preferNoScheduleTaint.Enable()
continue
}
}
err = drain(client, node)
if err != nil {
if !forceReboot {
log.Errorf("Unable to cordon or drain %s: %v, will release lock and retry cordon and drain before rebooting when lock is next acquired", node.GetName(), err)
release(lock, concurrency > 1)
err = lock.Release()
if err != nil {
log.Errorf("Error releasing lock: %v", err)
}
log.Infof("Performing a best-effort uncordon after failed cordon and drain")
uncordon(client, node)
continue
@@ -795,108 +702,15 @@ func rebootAsRequired(nodeID string, booter reboot.Reboot, sentinelCommand []str
log.Warnf("Error notifying: %v", err)
}
}
log.Infof("Triggering reboot for node %v", nodeID)
booter.Reboot()
err = rebooter.Reboot()
if err != nil {
log.Fatalf("Unable to reboot node: %v", err)
}
for {
log.Infof("Waiting for reboot")
time.Sleep(time.Minute)
}
}
}
// buildSentinelCommand creates the shell command line which will need wrapping to escape
// the container boundaries
func buildSentinelCommand(rebootSentinelFile string, rebootSentinelCommand string) []string {
if rebootSentinelCommand != "" {
cmd, err := shlex.Split(rebootSentinelCommand)
if err != nil {
log.Fatalf("Error parsing provided sentinel command: %v", err)
}
return cmd
}
return []string{"test", "-f", rebootSentinelFile}
}
// parseRebootCommand creates the shell command line which will need wrapping to escape
// the container boundaries
func parseRebootCommand(rebootCommand string) []string {
command, err := shlex.Split(rebootCommand)
if err != nil {
log.Fatalf("Error parsing provided reboot command: %v", err)
}
return command
}
func root(cmd *cobra.Command, args []string) {
if logFormat == "json" {
log.SetFormatter(&log.JSONFormatter{})
}
log.Infof("Kubernetes Reboot Daemon: %s", version)
if nodeID == "" {
log.Fatal("KURED_NODE_ID environment variable required")
}
window, err := timewindow.New(rebootDays, rebootStart, rebootEnd, timezone)
if err != nil {
log.Fatalf("Failed to build time window: %v", err)
}
sentinelCommand := buildSentinelCommand(rebootSentinelFile, rebootSentinelCommand)
restartCommand := parseRebootCommand(rebootCommand)
log.Infof("Node ID: %s", nodeID)
log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation)
if lockTTL > 0 {
log.Infof("Lock TTL set, lock will expire after: %v", lockTTL)
} else {
log.Info("Lock TTL not set, lock will remain until being released")
}
if lockReleaseDelay > 0 {
log.Infof("Lock release delay set, lock release will be delayed by: %v", lockReleaseDelay)
} else {
log.Info("Lock release delay not set, lock will be released immediately after rebooting")
}
log.Infof("PreferNoSchedule taint: %s", preferNoScheduleTaintName)
log.Infof("Blocking Pod Selectors: %v", podSelectors)
log.Infof("Reboot schedule: %v", window)
log.Infof("Reboot check command: %s every %v", sentinelCommand, period)
log.Infof("Concurrency: %v", concurrency)
log.Infof("Reboot method: %s", rebootMethod)
if rebootCommand == MethodCommand {
log.Infof("Reboot command: %s", restartCommand)
} else {
log.Infof("Reboot signal: %v", rebootSignal)
}
if annotateNodes {
log.Infof("Will annotate nodes during kured reboot operations")
}
// To run those commands as it was the host, we'll use nsenter
// Relies on hostPID:true and privileged:true to enter host mount space
// PID set to 1, until we have a better discovery mechanism.
hostRestartCommand := buildHostCommand(1, restartCommand)
// Only wrap sentinel-command with nsenter, if a custom-command was configured, otherwise use the host-path mount
hostSentinelCommand := sentinelCommand
if rebootSentinelCommand != "" {
hostSentinelCommand = buildHostCommand(1, sentinelCommand)
}
var booter reboot.Reboot
if rebootMethod == MethodCommand {
booter = reboot.NewCommandReboot(nodeID, hostRestartCommand)
} else if rebootMethod == MethodSignal {
booter = reboot.NewSignalReboot(nodeID, rebootSignal)
} else {
log.Fatalf("Invalid reboot-method configured: %s", rebootMethod)
}
go rebootAsRequired(nodeID, booter, hostSentinelCommand, window, lockTTL, lockReleaseDelay)
go maintainRebootRequiredMetric(nodeID, hostSentinelCommand)
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(fmt.Sprintf("%s:%d", metricsHost, metricsPort), nil))
}

View File

@@ -3,61 +3,29 @@ package main
import (
"reflect"
"testing"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/kubereboot/kured/pkg/alerts"
assert "gotest.tools/v3/assert"
papi "github.com/prometheus/client_golang/api"
)
type BlockingChecker struct {
blocking bool
}
func TestValidateNotificationURL(t *testing.T) {
func (fbc BlockingChecker) isBlocked() bool {
return fbc.blocking
}
var _ RebootBlocker = BlockingChecker{} // Verify that Type implements Interface.
var _ RebootBlocker = (*BlockingChecker)(nil) // Verify that *Type implements Interface.
func Test_flagCheck(t *testing.T) {
var cmd *cobra.Command
var args []string
slackHookURL = "https://hooks.slack.com/services/BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET"
expected := "slack://BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET"
flagCheck(cmd, args)
if notifyURL != expected {
t.Errorf("Slack URL Parsing is wrong: expecting %s but got %s\n", expected, notifyURL)
tests := []struct {
name string
slackHookURL string
notifyURL string
expected string
}{
{"slackHookURL only works fine", "https://hooks.slack.com/services/BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET", "", "slack://BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET"},
{"slackHookURL and notify URL together only keeps notifyURL", "\"https://hooks.slack.com/services/BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET\"", "teams://79b4XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX@acd8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX/204cXXXXXXXXXXXXXXXXXXXXXXXXXXXX/a1f8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX?host=XXXX.webhook.office.com", "teams://79b4XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX@acd8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX/204cXXXXXXXXXXXXXXXXXXXXXXXXXXXX/a1f8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX?host=XXXX.webhook.office.com"},
{"slackHookURL removes extraneous double quotes", "\"https://hooks.slack.com/services/BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET\"", "", "slack://BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET"},
{"slackHookURL removes extraneous single quotes", "'https://hooks.slack.com/services/BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET'", "", "slack://BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET"},
{"notifyURL removes extraneous double quotes", "", "\"teams://79b4XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX@acd8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX/204cXXXXXXXXXXXXXXXXXXXXXXXXXXXX/a1f8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX?host=XXXX.webhook.office.com\"", "teams://79b4XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX@acd8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX/204cXXXXXXXXXXXXXXXXXXXXXXXXXXXX/a1f8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX?host=XXXX.webhook.office.com"},
{"notifyURL removes extraneous single quotes", "", "'teams://79b4XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX@acd8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX/204cXXXXXXXXXXXXXXXXXXXXXXXXXXXX/a1f8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX?host=XXXX.webhook.office.com'", "teams://79b4XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX@acd8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX/204cXXXXXXXXXXXXXXXXXXXXXXXXXXXX/a1f8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX?host=XXXX.webhook.office.com"},
}
// validate that surrounding quotes are stripped
slackHookURL = "\"https://hooks.slack.com/services/BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET\""
expected = "slack://BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET"
flagCheck(cmd, args)
if notifyURL != expected {
t.Errorf("Slack URL Parsing is wrong: expecting %s but got %s\n", expected, notifyURL)
}
slackHookURL = "'https://hooks.slack.com/services/BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET'"
expected = "slack://BLABLABA12345/IAM931A0VERY/COMPLICATED711854TOKEN1SET"
flagCheck(cmd, args)
if notifyURL != expected {
t.Errorf("Slack URL Parsing is wrong: expecting %s but got %s\n", expected, notifyURL)
}
slackHookURL = ""
notifyURL = "\"teams://79b4XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX@acd8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX/204cXXXXXXXXXXXXXXXXXXXXXXXXXXXX/a1f8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX?host=XXXX.webhook.office.com\""
expected = "teams://79b4XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX@acd8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX/204cXXXXXXXXXXXXXXXXXXXXXXXXXXXX/a1f8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX?host=XXXX.webhook.office.com"
flagCheck(cmd, args)
if notifyURL != expected {
t.Errorf("notifyURL Parsing is wrong: expecting %s but got %s\n", expected, notifyURL)
}
notifyURL = "'teams://79b4XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX@acd8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX/204cXXXXXXXXXXXXXXXXXXXXXXXXXXXX/a1f8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX?host=XXXX.webhook.office.com'"
expected = "teams://79b4XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX@acd8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX/204cXXXXXXXXXXXXXXXXXXXXXXXXXXXX/a1f8XXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX?host=XXXX.webhook.office.com"
flagCheck(cmd, args)
if notifyURL != expected {
t.Errorf("notifyURL Parsing is wrong: expecting %s but got %s\n", expected, notifyURL)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := validateNotificationURL(tt.notifyURL, tt.slackHookURL); !reflect.DeepEqual(got, tt.expected) {
t.Errorf("validateNotificationURL() = %v, expected %v", got, tt.expected)
}
})
}
}
@@ -106,205 +74,3 @@ func Test_stripQuotes(t *testing.T) {
})
}
}
func Test_rebootBlocked(t *testing.T) {
noCheckers := []RebootBlocker{}
nonblockingChecker := BlockingChecker{blocking: false}
blockingChecker := BlockingChecker{blocking: true}
// Instantiate a prometheusClient with a broken_url
promClient, err := alerts.NewPromClient(papi.Config{Address: "broken_url"})
if err != nil {
log.Fatal("Can't create prometheusClient: ", err)
}
brokenPrometheusClient := PrometheusBlockingChecker{promClient: promClient, filter: nil, firingOnly: false}
type args struct {
blockers []RebootBlocker
}
tests := []struct {
name string
args args
want bool
}{
{
name: "Do not block on no blocker defined",
args: args{blockers: noCheckers},
want: false,
},
{
name: "Ensure a blocker blocks",
args: args{blockers: []RebootBlocker{blockingChecker}},
want: true,
},
{
name: "Ensure a non-blocker doesn't block",
args: args{blockers: []RebootBlocker{nonblockingChecker}},
want: false,
},
{
name: "Ensure one blocker is enough to block",
args: args{blockers: []RebootBlocker{nonblockingChecker, blockingChecker}},
want: true,
},
{
name: "Do block on error contacting prometheus API",
args: args{blockers: []RebootBlocker{brokenPrometheusClient}},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := rebootBlocked(tt.args.blockers...); got != tt.want {
t.Errorf("rebootBlocked() = %v, want %v", got, tt.want)
}
})
}
}
func Test_buildHostCommand(t *testing.T) {
type args struct {
pid int
command []string
}
tests := []struct {
name string
args args
want []string
}{
{
name: "Ensure command will run with nsenter",
args: args{pid: 1, command: []string{"ls", "-Fal"}},
want: []string{"/usr/bin/nsenter", "-m/proc/1/ns/mnt", "--", "ls", "-Fal"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := buildHostCommand(tt.args.pid, tt.args.command); !reflect.DeepEqual(got, tt.want) {
t.Errorf("buildHostCommand() = %v, want %v", got, tt.want)
}
})
}
}
func Test_buildSentinelCommand(t *testing.T) {
type args struct {
rebootSentinelFile string
rebootSentinelCommand string
}
tests := []struct {
name string
args args
want []string
}{
{
name: "Ensure a sentinelFile generates a shell 'test' command with the right file",
args: args{
rebootSentinelFile: "/test1",
rebootSentinelCommand: "",
},
want: []string{"test", "-f", "/test1"},
},
{
name: "Ensure a sentinelCommand has priority over a sentinelFile if both are provided (because sentinelFile is always provided)",
args: args{
rebootSentinelFile: "/test1",
rebootSentinelCommand: "/sbin/reboot-required -r",
},
want: []string{"/sbin/reboot-required", "-r"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := buildSentinelCommand(tt.args.rebootSentinelFile, tt.args.rebootSentinelCommand); !reflect.DeepEqual(got, tt.want) {
t.Errorf("buildSentinelCommand() = %v, want %v", got, tt.want)
}
})
}
}
func Test_parseRebootCommand(t *testing.T) {
type args struct {
rebootCommand string
}
tests := []struct {
name string
args args
want []string
}{
{
name: "Ensure a reboot command is properly parsed",
args: args{
rebootCommand: "/sbin/systemctl reboot",
},
want: []string{"/sbin/systemctl", "reboot"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := parseRebootCommand(tt.args.rebootCommand); !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseRebootCommand() = %v, want %v", got, tt.want)
}
})
}
}
func Test_rebootRequired(t *testing.T) {
type args struct {
sentinelCommand []string
}
tests := []struct {
name string
args args
want bool
}{
{
name: "Ensure rc = 0 means reboot required",
args: args{
sentinelCommand: []string{"true"},
},
want: true,
},
{
name: "Ensure rc != 0 means reboot NOT required",
args: args{
sentinelCommand: []string{"false"},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := rebootRequired(tt.args.sentinelCommand); got != tt.want {
t.Errorf("rebootRequired() = %v, want %v", got, tt.want)
}
})
}
}
func Test_rebootRequired_fatals(t *testing.T) {
cases := []struct {
param []string
expectFatal bool
}{
{
param: []string{"true"},
expectFatal: false,
},
{
param: []string{"./babar"},
expectFatal: true,
},
}
defer func() { log.StandardLogger().ExitFunc = nil }()
var fatal bool
log.StandardLogger().ExitFunc = func(int) { fatal = true }
for _, c := range cases {
fatal = false
rebootRequired(c.param)
assert.Equal(t, c.expectFatal, fatal)
}
}

View File

@@ -5,14 +5,14 @@ import (
)
type regexpValue struct {
value **regexp.Regexp
*regexp.Regexp
}
func (rev *regexpValue) String() string {
if *rev.value == nil {
if rev.Regexp == nil {
return ""
}
return (*rev.value).String()
return rev.Regexp.String()
}
func (rev *regexpValue) Set(s string) error {
@@ -20,12 +20,11 @@ func (rev *regexpValue) Set(s string) error {
if err != nil {
return err
}
*rev.value = value
rev.Regexp = value
return nil
}
// Type method returns the type of the flag as a string
func (rev *regexpValue) Type() string {
return "regexp.Regexp"
return "regexp"
}

35
internal/validators.go Normal file
View File

@@ -0,0 +1,35 @@
package internal
import (
"fmt"
"github.com/kubereboot/kured/pkg/checkers"
"github.com/kubereboot/kured/pkg/reboot"
log "github.com/sirupsen/logrus"
)
// NewRebooter validates the rebootMethod, rebootCommand, and rebootSignal input,
// then chains to the right constructor.
func NewRebooter(rebootMethod string, rebootCommand string, rebootSignal int) (reboot.Rebooter, error) {
switch {
case rebootMethod == "command":
log.Infof("Reboot command: %s", rebootCommand)
return reboot.NewCommandRebooter(rebootCommand)
case rebootMethod == "signal":
log.Infof("Reboot signal: %d", rebootSignal)
return reboot.NewSignalRebooter(rebootSignal)
default:
return nil, fmt.Errorf("invalid reboot-method configured %s, expected signal or command", rebootMethod)
}
}
// NewRebootChecker validates the rebootSentinelCommand, rebootSentinelFile input,
// then chains to the right constructor.
func NewRebootChecker(rebootSentinelCommand string, rebootSentinelFile string) (checkers.Checker, error) {
// An override of rebootSentinelCommand means a privileged command
if rebootSentinelCommand != "" {
log.Infof("Sentinel checker is (privileged) user provided command: %s", rebootSentinelCommand)
return checkers.NewCommandChecker(rebootSentinelCommand)
}
log.Infof("Sentinel checker is (unprivileged) testing for the presence of: %s", rebootSentinelFile)
return checkers.NewFileRebootChecker(rebootSentinelFile)
}

View File

@@ -1,77 +0,0 @@
package alerts
import (
"context"
"fmt"
"regexp"
"sort"
"time"
papi "github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
)
// PromClient is a wrapper around the Prometheus Client interface and implements the api
// This way, the PromClient can be instantiated with the configuration the Client needs, and
// the ability to use the methods the api has, like Query and so on.
type PromClient struct {
papi papi.Client
api v1.API
}
// NewPromClient creates a new client to the Prometheus API.
// It returns an error on any problem.
func NewPromClient(conf papi.Config) (*PromClient, error) {
promClient, err := papi.NewClient(conf)
if err != nil {
return nil, err
}
client := PromClient{papi: promClient, api: v1.NewAPI(promClient)}
return &client, nil
}
// ActiveAlerts is a method of type PromClient, it returns a list of names of active alerts
// (e.g. pending or firing), filtered by the supplied regexp or by the includeLabels query.
// filter by regexp means when the regex finds the alert-name; the alert is exluded from the
// block-list and will NOT block rebooting. query by includeLabel means,
// if the query finds an alert, it will include it to the block-list and it WILL block rebooting.
func (p *PromClient) ActiveAlerts(filter *regexp.Regexp, firingOnly, filterMatchOnly bool) ([]string, error) {
// get all alerts from prometheus
value, _, err := p.api.Query(context.Background(), "ALERTS", time.Now())
if err != nil {
return nil, err
}
if value.Type() == model.ValVector {
if vector, ok := value.(model.Vector); ok {
activeAlertSet := make(map[string]bool)
for _, sample := range vector {
if alertName, isAlert := sample.Metric[model.AlertNameLabel]; isAlert && sample.Value != 0 {
if matchesRegex(filter, string(alertName), filterMatchOnly) && (!firingOnly || sample.Metric["alertstate"] == "firing") {
activeAlertSet[string(alertName)] = true
}
}
}
var activeAlerts []string
for activeAlert := range activeAlertSet {
activeAlerts = append(activeAlerts, activeAlert)
}
sort.Strings(activeAlerts)
return activeAlerts, nil
}
}
return nil, fmt.Errorf("Unexpected value type: %v", value)
}
func matchesRegex(filter *regexp.Regexp, alertName string, filterMatchOnly bool) bool {
if filter == nil {
return true
}
return filter.MatchString(string(alertName)) == filterMatchOnly
}

18
pkg/blockers/blockers.go Normal file
View File

@@ -0,0 +1,18 @@
package blockers
// RebootBlocked checks that a single block Checker
// will block the reboot or not.
func RebootBlocked(blockers ...RebootBlocker) bool {
for _, blocker := range blockers {
if blocker.IsBlocked() {
return true
}
}
return false
}
// RebootBlocker interface should be implemented by types
// to know if their instantiations should block a reboot
type RebootBlocker interface {
IsBlocked() bool
}

View File

@@ -0,0 +1,65 @@
package blockers
import (
papi "github.com/prometheus/client_golang/api"
"testing"
)
type BlockingChecker struct {
blocking bool
}
func (fbc BlockingChecker) IsBlocked() bool {
return fbc.blocking
}
func Test_rebootBlocked(t *testing.T) {
noCheckers := []RebootBlocker{}
nonblockingChecker := BlockingChecker{blocking: false}
blockingChecker := BlockingChecker{blocking: true}
// Instantiate a prometheusClient with a broken_url
brokenPrometheusClient := NewPrometheusBlockingChecker(papi.Config{Address: "broken_url"}, nil, false, false)
type args struct {
blockers []RebootBlocker
}
tests := []struct {
name string
args args
want bool
}{
{
name: "Do not block on no blocker defined",
args: args{blockers: noCheckers},
want: false,
},
{
name: "Ensure a blocker blocks",
args: args{blockers: []RebootBlocker{blockingChecker}},
want: true,
},
{
name: "Ensure a non-blocker doesn't block",
args: args{blockers: []RebootBlocker{nonblockingChecker}},
want: false,
},
{
name: "Ensure one blocker is enough to block",
args: args{blockers: []RebootBlocker{nonblockingChecker, blockingChecker}},
want: true,
},
{
name: "Do block on error contacting prometheus API",
args: args{blockers: []RebootBlocker{brokenPrometheusClient}},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := RebootBlocked(tt.args.blockers...); got != tt.want {
t.Errorf("rebootBlocked() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -0,0 +1,61 @@
package blockers
import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
// Compile-time checks to ensure the type implements the interface
var (
_ RebootBlocker = (*KubernetesBlockingChecker)(nil)
)
// KubernetesBlockingChecker contains info for connecting
// to k8s, and can give info about whether a reboot should be blocked
type KubernetesBlockingChecker struct {
// client used to contact kubernetes API
client *kubernetes.Clientset
nodeName string
// lised used to filter pods (podSelector)
filter []string
}
func NewKubernetesBlockingChecker(client *kubernetes.Clientset, nodename string, podSelectors []string) *KubernetesBlockingChecker {
return &KubernetesBlockingChecker{
client: client,
nodeName: nodename,
filter: podSelectors,
}
}
// IsBlocked for the KubernetesBlockingChecker will check if a pod, for the node, is preventing
// the reboot. It will warn in the logs about blocking, but does not return an error.
func (kb KubernetesBlockingChecker) IsBlocked() bool {
fieldSelector := fmt.Sprintf("spec.nodeName=%s,status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown", kb.nodeName)
for _, labelSelector := range kb.filter {
podList, err := kb.client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
FieldSelector: fieldSelector,
Limit: 10})
if err != nil {
log.Warnf("Reboot blocked: pod query error: %v", err)
return true
}
if len(podList.Items) > 0 {
podNames := make([]string, 0, len(podList.Items))
for _, pod := range podList.Items {
podNames = append(podNames, pod.Name)
}
if len(podList.Continue) > 0 {
podNames = append(podNames, "...")
}
log.Warnf("Reboot blocked: matching pods: %v", podNames)
return true
}
}
return false
}

118
pkg/blockers/prometheus.go Normal file
View File

@@ -0,0 +1,118 @@
package blockers
import (
"context"
"fmt"
papi "github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
log "github.com/sirupsen/logrus"
"regexp"
"sort"
"time"
)
// Compile-time checks to ensure the type implements the interface
var (
_ RebootBlocker = (*PrometheusBlockingChecker)(nil)
)
// PrometheusBlockingChecker contains info for connecting
// to prometheus, and can give info about whether a reboot should be blocked
type PrometheusBlockingChecker struct {
promConfig papi.Config
// regexp used to get alerts
filter *regexp.Regexp
// bool to indicate if only firing alerts should be considered
firingOnly bool
// bool to indicate that we're only blocking on alerts which match the filter
filterMatchOnly bool
// storing the promClient
promClient papi.Client
}
func NewPrometheusBlockingChecker(config papi.Config, alertFilter *regexp.Regexp, firingOnly bool, filterMatchOnly bool) PrometheusBlockingChecker {
promClient, _ := papi.NewClient(config)
return PrometheusBlockingChecker{
promConfig: config,
filter: alertFilter,
firingOnly: firingOnly,
filterMatchOnly: filterMatchOnly,
promClient: promClient,
}
}
// IsBlocked for the prometheus will check if there are active alerts matching
// the arguments given into the PrometheusBlockingChecker which would actively
// block the reboot.
// As of today, no blocker information is shared as a return of the method,
// and the information is simply logged.
func (pb PrometheusBlockingChecker) IsBlocked() bool {
alertNames, err := pb.ActiveAlerts()
if err != nil {
log.Warnf("Reboot blocked: prometheus query error: %v", err)
return true
}
count := len(alertNames)
if count > 10 {
alertNames = append(alertNames[:10], "...")
}
if count > 0 {
log.Warnf("Reboot blocked: %d active alerts: %v", count, alertNames)
return true
}
return false
}
// MetricLabel is used to give a fancier name
// than the type to the label for rebootBlockedCounter
func (pb PrometheusBlockingChecker) MetricLabel() string {
return "prometheus"
}
// ActiveAlerts is a method of type promClient, it returns a list of names of active alerts
// (e.g. pending or firing), filtered by the supplied regexp or by the includeLabels query.
// filter by regexp means when the regexp finds the alert-name; the alert is excluded from the
// block-list and will NOT block rebooting. query by includeLabel means,
// if the query finds an alert, it will include it to the block-list, and it WILL block rebooting.
func (pb PrometheusBlockingChecker) ActiveAlerts() ([]string, error) {
api := v1.NewAPI(pb.promClient)
// get all alerts from prometheus
value, _, err := api.Query(context.Background(), "ALERTS", time.Now())
if err != nil {
return nil, err
}
if value.Type() == model.ValVector {
if vector, ok := value.(model.Vector); ok {
activeAlertSet := make(map[string]bool)
for _, sample := range vector {
if alertName, isAlert := sample.Metric[model.AlertNameLabel]; isAlert && sample.Value != 0 {
if matchesRegex(pb.filter, string(alertName), pb.filterMatchOnly) && (!pb.firingOnly || sample.Metric["alertstate"] == "firing") {
activeAlertSet[string(alertName)] = true
}
}
}
var activeAlerts []string
for activeAlert := range activeAlertSet {
activeAlerts = append(activeAlerts, activeAlert)
}
sort.Strings(activeAlerts)
return activeAlerts, nil
}
}
return nil, fmt.Errorf("unexpected value type %v", value)
}
func matchesRegex(filter *regexp.Regexp, alertName string, filterMatchOnly bool) bool {
if filter == nil {
return true
}
return filter.MatchString(alertName) == filterMatchOnly
}

View File

@@ -1,4 +1,4 @@
package alerts
package blockers
import (
"log"
@@ -145,12 +145,9 @@ func TestActiveAlerts(t *testing.T) {
regex, _ := regexp.Compile(tc.rFilter)
// instantiate the prometheus client with the mockserver-address
p, err := NewPromClient(api.Config{Address: mockServer.URL})
if err != nil {
log.Fatal(err)
}
p := NewPrometheusBlockingChecker(api.Config{Address: mockServer.URL}, regex, tc.firingOnly, tc.filterMatchOnly)
result, err := p.ActiveAlerts(regex, tc.firingOnly, tc.filterMatchOnly)
result, err := p.ActiveAlerts()
if err != nil {
log.Fatal(err)
}

98
pkg/checkers/checker.go Normal file
View File

@@ -0,0 +1,98 @@
package checkers
import (
"fmt"
"github.com/google/shlex"
"github.com/kubereboot/kured/pkg/util"
log "github.com/sirupsen/logrus"
"os"
"os/exec"
)
// Checker is the standard interface to use to check
// if a reboot is required. Its types must implement a
// CheckRebootRequired method which returns a single boolean
// clarifying whether a reboot is expected or not.
type Checker interface {
RebootRequired() bool
}
// FileRebootChecker is the default reboot checker.
// It is unprivileged, and tests the presence of a files
type FileRebootChecker struct {
FilePath string
}
// RebootRequired checks the file presence
// needs refactoring to also return an error, instead of leaking it inside the code.
// This needs refactoring to get rid of NewCommand
// This needs refactoring to only contain file location, instead of CheckCommand
func (rc FileRebootChecker) RebootRequired() bool {
if _, err := os.Stat(rc.FilePath); err == nil {
return true
}
return false
}
// NewFileRebootChecker is the constructor for the file based reboot checker
// TODO: Add extra input validation on filePath string here
func NewFileRebootChecker(filePath string) (*FileRebootChecker, error) {
return &FileRebootChecker{
FilePath: filePath,
}, nil
}
// CommandChecker is using a custom command to check
// if a reboot is required. There are two modes of behaviour,
// if Privileged is granted, the NamespacePid is used to enter
// the given PID's namespace.
type CommandChecker struct {
CheckCommand []string
NamespacePid int
Privileged bool
}
// RebootRequired for CommandChecker runs a command without returning
// any eventual error. THis should be later refactored to remove the util wrapper
// and return the errors, instead of logging them here.
func (rc CommandChecker) RebootRequired() bool {
var cmdline []string
if rc.Privileged {
cmdline = util.PrivilegedHostCommand(rc.NamespacePid, rc.CheckCommand)
} else {
cmdline = rc.CheckCommand
}
cmd := util.NewCommand(cmdline[0], cmdline[1:]...)
if err := cmd.Run(); err != nil {
switch err := err.(type) {
case *exec.ExitError:
// We assume a non-zero exit code means 'reboot not required', but of course
// the user could have misconfigured the sentinel command or something else
// went wrong during its execution. In that case, not entering a reboot loop
// is the right thing to do, and we are logging stdout/stderr of the command
// so it should be obvious what is wrong.
if cmd.ProcessState.ExitCode() != 1 {
log.Warnf("sentinel command ended with unexpected exit code: %v", cmd.ProcessState.ExitCode())
}
return false
default:
// Something was grossly misconfigured, such as the command path being wrong.
log.Fatalf("Error invoking sentinel command: %v", err)
}
}
return true
}
// NewCommandChecker is the constructor for the commandChecker, and by default
// runs new commands in a privileged fashion.
func NewCommandChecker(sentinelCommand string) (*CommandChecker, error) {
cmd, err := shlex.Split(sentinelCommand)
if err != nil {
return nil, fmt.Errorf("error parsing provided sentinel command: %v", err)
}
return &CommandChecker{
CheckCommand: cmd,
NamespacePid: 1,
Privileged: true,
}, nil
}

View File

@@ -0,0 +1,69 @@
package checkers
import (
log "github.com/sirupsen/logrus"
assert "gotest.tools/v3/assert"
"testing"
)
func Test_rebootRequired(t *testing.T) {
type args struct {
sentinelCommand []string
}
tests := []struct {
name string
args args
want bool
}{
{
name: "Ensure rc = 0 means reboot required",
args: args{
sentinelCommand: []string{"true"},
},
want: true,
},
{
name: "Ensure rc != 0 means reboot NOT required",
args: args{
sentinelCommand: []string{"false"},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := CommandChecker{CheckCommand: tt.args.sentinelCommand, NamespacePid: 1, Privileged: false}
if got := a.RebootRequired(); got != tt.want {
t.Errorf("rebootRequired() = %v, want %v", got, tt.want)
}
})
}
}
func Test_rebootRequired_fatals(t *testing.T) {
cases := []struct {
param []string
expectFatal bool
}{
{
param: []string{"true"},
expectFatal: false,
},
{
param: []string{"./babar"},
expectFatal: true,
},
}
defer func() { log.StandardLogger().ExitFunc = nil }()
var fatal bool
log.StandardLogger().ExitFunc = func(int) { fatal = true }
for _, c := range cases {
fatal = false
a := CommandChecker{CheckCommand: c.param, NamespacePid: 1, Privileged: false}
a.RebootRequired()
assert.Equal(t, c.expectFatal, fatal)
}
}

View File

@@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"
"strings"
"time"
v1 "k8s.io/api/apps/v1"
@@ -18,6 +20,21 @@ const (
k8sAPICallRetryTimeout = 5 * time.Minute // How long to wait until we determine that the k8s API is definitively unavailable
)
type Lock interface {
Acquire(NodeMeta) (bool, string, error)
Release() error
Holding() (bool, LockAnnotationValue, error)
}
type GenericLock struct {
TTL time.Duration
releaseDelay time.Duration
}
type NodeMeta struct {
Unschedulable bool `json:"unschedulable"`
}
// DaemonSetLock holds all necessary information to do actions
// on the kured ds which holds lock info through annotations.
type DaemonSetLock struct {
@@ -28,25 +45,90 @@ type DaemonSetLock struct {
annotation string
}
type lockAnnotationValue struct {
// DaemonSetSingleLock holds all necessary information to do actions
// on the kured ds which holds lock info through annotations.
type DaemonSetSingleLock struct {
GenericLock
DaemonSetLock
}
// DaemonSetMultiLock holds all necessary information to do actions
// on the kured ds which holds lock info through annotations, valid
// for multiple nodes
type DaemonSetMultiLock struct {
GenericLock
DaemonSetLock
maxOwners int
}
// LockAnnotationValue contains the lock data,
// which allows persistence across reboots, particularily recording if the
// node was already unschedulable before kured reboot.
// To be modified when using another type of lock storage.
type LockAnnotationValue struct {
NodeID string `json:"nodeID"`
Metadata interface{} `json:"metadata,omitempty"`
Metadata NodeMeta `json:"metadata,omitempty"`
Created time.Time `json:"created"`
TTL time.Duration `json:"TTL"`
}
type multiLockAnnotationValue struct {
MaxOwners int `json:"maxOwners"`
LockAnnotations []lockAnnotationValue `json:"locks"`
LockAnnotations []LockAnnotationValue `json:"locks"`
}
// New creates a daemonsetLock object containing the necessary data for follow up k8s requests
func New(client *kubernetes.Clientset, nodeID, namespace, name, annotation string) *DaemonSetLock {
return &DaemonSetLock{client, nodeID, namespace, name, annotation}
func New(client *kubernetes.Clientset, nodeID, namespace, name, annotation string, TTL time.Duration, concurrency int, lockReleaseDelay time.Duration) Lock {
if concurrency > 1 {
return &DaemonSetMultiLock{
GenericLock: GenericLock{
TTL: TTL,
releaseDelay: lockReleaseDelay,
},
DaemonSetLock: DaemonSetLock{
client: client,
nodeID: nodeID,
namespace: namespace,
name: name,
annotation: annotation,
},
maxOwners: concurrency,
}
} else {
return &DaemonSetSingleLock{
GenericLock: GenericLock{
TTL: TTL,
releaseDelay: lockReleaseDelay,
},
DaemonSetLock: DaemonSetLock{
client: client,
nodeID: nodeID,
namespace: namespace,
name: name,
annotation: annotation,
},
}
}
}
// GetDaemonSet returns the named DaemonSet resource from the DaemonSetLock's configured client
func (dsl *DaemonSetLock) GetDaemonSet(sleep, timeout time.Duration) (*v1.DaemonSet, error) {
var ds *v1.DaemonSet
var lastError error
err := wait.PollUntilContextTimeout(context.Background(), sleep, timeout, true, func(ctx context.Context) (bool, error) {
if ds, lastError = dsl.client.AppsV1().DaemonSets(dsl.namespace).Get(ctx, dsl.name, metav1.GetOptions{}); lastError != nil {
return false, nil
}
return true, nil
})
if err != nil {
return nil, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %v", dsl.name, dsl.namespace, lastError)
}
return ds, nil
}
// Acquire attempts to annotate the kured daemonset with lock info from instantiated DaemonSetLock using client-go
func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool, string, error) {
func (dsl *DaemonSetSingleLock) Acquire(nodeMetadata NodeMeta) (bool, string, error) {
for {
ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout)
if err != nil {
@@ -55,7 +137,7 @@ func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool
valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation]
if exists {
value := lockAnnotationValue{}
value := LockAnnotationValue{}
if err := json.Unmarshal([]byte(valueString), &value); err != nil {
return false, "", err
}
@@ -68,7 +150,7 @@ func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool
if ds.ObjectMeta.Annotations == nil {
ds.ObjectMeta.Annotations = make(map[string]string)
}
value := lockAnnotationValue{NodeID: dsl.nodeID, Metadata: metadata, Created: time.Now().UTC(), TTL: TTL}
value := LockAnnotationValue{NodeID: dsl.nodeID, Metadata: nodeMetadata, Created: time.Now().UTC(), TTL: dsl.TTL}
valueBytes, err := json.Marshal(&value)
if err != nil {
return false, "", err
@@ -89,140 +171,35 @@ func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool
}
}
// AcquireMultiple creates and annotates the daemonset with a multiple owner lock
func (dsl *DaemonSetLock) AcquireMultiple(metadata interface{}, TTL time.Duration, maxOwners int) (bool, []string, error) {
for {
ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout)
if err != nil {
return false, []string{}, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err)
}
annotation := multiLockAnnotationValue{}
valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation]
if exists {
if err := json.Unmarshal([]byte(valueString), &annotation); err != nil {
return false, []string{}, fmt.Errorf("error getting multi lock: %w", err)
}
}
lockPossible, newAnnotation := dsl.canAcquireMultiple(annotation, metadata, TTL, maxOwners)
if !lockPossible {
return false, nodeIDsFromMultiLock(newAnnotation), nil
}
if ds.ObjectMeta.Annotations == nil {
ds.ObjectMeta.Annotations = make(map[string]string)
}
newAnnotationBytes, err := json.Marshal(&newAnnotation)
if err != nil {
return false, []string{}, fmt.Errorf("error marshalling new annotation lock: %w", err)
}
ds.ObjectMeta.Annotations[dsl.annotation] = string(newAnnotationBytes)
_, err = dsl.client.AppsV1().DaemonSets(dsl.namespace).Update(context.Background(), ds, metav1.UpdateOptions{})
if err != nil {
if se, ok := err.(*errors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict {
time.Sleep(time.Second)
continue
} else {
return false, []string{}, fmt.Errorf("error updating daemonset with multi lock: %w", err)
}
}
return true, nodeIDsFromMultiLock(newAnnotation), nil
}
}
func nodeIDsFromMultiLock(annotation multiLockAnnotationValue) []string {
nodeIDs := make([]string, 0, len(annotation.LockAnnotations))
for _, nodeLock := range annotation.LockAnnotations {
nodeIDs = append(nodeIDs, nodeLock.NodeID)
}
return nodeIDs
}
func (dsl *DaemonSetLock) canAcquireMultiple(annotation multiLockAnnotationValue, metadata interface{}, TTL time.Duration, maxOwners int) (bool, multiLockAnnotationValue) {
newAnnotation := multiLockAnnotationValue{MaxOwners: maxOwners}
freeSpace := false
if annotation.LockAnnotations == nil || len(annotation.LockAnnotations) < maxOwners {
freeSpace = true
newAnnotation.LockAnnotations = annotation.LockAnnotations
} else {
for _, nodeLock := range annotation.LockAnnotations {
if ttlExpired(nodeLock.Created, nodeLock.TTL) {
freeSpace = true
continue
}
newAnnotation.LockAnnotations = append(
newAnnotation.LockAnnotations,
nodeLock,
)
}
}
if freeSpace {
newAnnotation.LockAnnotations = append(
newAnnotation.LockAnnotations,
lockAnnotationValue{
NodeID: dsl.nodeID,
Metadata: metadata,
Created: time.Now().UTC(),
TTL: TTL,
},
)
return true, newAnnotation
}
return false, multiLockAnnotationValue{}
}
// Test attempts to check the kured daemonset lock status (existence, expiry) from instantiated DaemonSetLock using client-go
func (dsl *DaemonSetLock) Test(metadata interface{}) (bool, error) {
func (dsl *DaemonSetSingleLock) Holding() (bool, LockAnnotationValue, error) {
var lockData LockAnnotationValue
ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout)
if err != nil {
return false, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err)
return false, lockData, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err)
}
valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation]
if exists {
value := lockAnnotationValue{Metadata: metadata}
value := LockAnnotationValue{}
if err := json.Unmarshal([]byte(valueString), &value); err != nil {
return false, err
return false, lockData, err
}
if !ttlExpired(value.Created, value.TTL) {
return value.NodeID == dsl.nodeID, nil
return value.NodeID == dsl.nodeID, value, nil
}
}
return false, nil
}
// TestMultiple attempts to check the kured daemonset lock status for multi locks
func (dsl *DaemonSetLock) TestMultiple() (bool, error) {
ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout)
if err != nil {
return false, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err)
}
valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation]
if exists {
value := multiLockAnnotationValue{}
if err := json.Unmarshal([]byte(valueString), &value); err != nil {
return false, err
}
for _, nodeLock := range value.LockAnnotations {
if nodeLock.NodeID == dsl.nodeID && !ttlExpired(nodeLock.Created, nodeLock.TTL) {
return true, nil
}
}
}
return false, nil
return false, lockData, nil
}
// Release attempts to remove the lock data from the kured ds annotations using client-go
func (dsl *DaemonSetLock) Release() error {
func (dsl *DaemonSetSingleLock) Release() error {
if dsl.releaseDelay > 0 {
log.Infof("Waiting %v before releasing lock", dsl.releaseDelay)
time.Sleep(dsl.releaseDelay)
}
for {
ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout)
if err != nil {
@@ -231,7 +208,7 @@ func (dsl *DaemonSetLock) Release() error {
valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation]
if exists {
value := lockAnnotationValue{}
value := LockAnnotationValue{}
if err := json.Unmarshal([]byte(valueString), &value); err != nil {
return err
}
@@ -259,8 +236,130 @@ func (dsl *DaemonSetLock) Release() error {
}
}
// ReleaseMultiple attempts to remove the lock data from the kured ds annotations using client-go
func (dsl *DaemonSetLock) ReleaseMultiple() error {
func ttlExpired(created time.Time, ttl time.Duration) bool {
if ttl > 0 && time.Since(created) >= ttl {
return true
}
return false
}
func nodeIDsFromMultiLock(annotation multiLockAnnotationValue) []string {
nodeIDs := make([]string, 0, len(annotation.LockAnnotations))
for _, nodeLock := range annotation.LockAnnotations {
nodeIDs = append(nodeIDs, nodeLock.NodeID)
}
return nodeIDs
}
func (dsl *DaemonSetLock) canAcquireMultiple(annotation multiLockAnnotationValue, metadata NodeMeta, TTL time.Duration, maxOwners int) (bool, multiLockAnnotationValue) {
newAnnotation := multiLockAnnotationValue{MaxOwners: maxOwners}
freeSpace := false
if annotation.LockAnnotations == nil || len(annotation.LockAnnotations) < maxOwners {
freeSpace = true
newAnnotation.LockAnnotations = annotation.LockAnnotations
} else {
for _, nodeLock := range annotation.LockAnnotations {
if ttlExpired(nodeLock.Created, nodeLock.TTL) {
freeSpace = true
continue
}
newAnnotation.LockAnnotations = append(
newAnnotation.LockAnnotations,
nodeLock,
)
}
}
if freeSpace {
newAnnotation.LockAnnotations = append(
newAnnotation.LockAnnotations,
LockAnnotationValue{
NodeID: dsl.nodeID,
Metadata: metadata,
Created: time.Now().UTC(),
TTL: TTL,
},
)
return true, newAnnotation
}
return false, multiLockAnnotationValue{}
}
// Acquire creates and annotates the daemonset with a multiple owner lock
func (dsl *DaemonSetMultiLock) Acquire(nodeMetaData NodeMeta) (bool, string, error) {
for {
ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout)
if err != nil {
return false, "", fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err)
}
annotation := multiLockAnnotationValue{}
valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation]
if exists {
if err := json.Unmarshal([]byte(valueString), &annotation); err != nil {
return false, "", fmt.Errorf("error getting multi lock: %w", err)
}
}
lockPossible, newAnnotation := dsl.canAcquireMultiple(annotation, nodeMetaData, dsl.TTL, dsl.maxOwners)
if !lockPossible {
return false, strings.Join(nodeIDsFromMultiLock(newAnnotation), ","), nil
}
if ds.ObjectMeta.Annotations == nil {
ds.ObjectMeta.Annotations = make(map[string]string)
}
newAnnotationBytes, err := json.Marshal(&newAnnotation)
if err != nil {
return false, "", fmt.Errorf("error marshalling new annotation lock: %w", err)
}
ds.ObjectMeta.Annotations[dsl.annotation] = string(newAnnotationBytes)
_, err = dsl.client.AppsV1().DaemonSets(dsl.namespace).Update(context.Background(), ds, metav1.UpdateOptions{})
if err != nil {
if se, ok := err.(*errors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict {
time.Sleep(time.Second)
continue
} else {
return false, "", fmt.Errorf("error updating daemonset with multi lock: %w", err)
}
}
return true, strings.Join(nodeIDsFromMultiLock(newAnnotation), ","), nil
}
}
// TestMultiple attempts to check the kured daemonset lock status for multi locks
func (dsl *DaemonSetMultiLock) Holding() (bool, LockAnnotationValue, error) {
var lockdata LockAnnotationValue
ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout)
if err != nil {
return false, lockdata, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err)
}
valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation]
if exists {
value := multiLockAnnotationValue{}
if err := json.Unmarshal([]byte(valueString), &value); err != nil {
return false, lockdata, err
}
for _, nodeLock := range value.LockAnnotations {
if nodeLock.NodeID == dsl.nodeID && !ttlExpired(nodeLock.Created, nodeLock.TTL) {
return true, nodeLock, nil
}
}
}
return false, lockdata, nil
}
// Release attempts to remove the lock data for a single node from the multi node annotation
func (dsl *DaemonSetMultiLock) Release() error {
if dsl.releaseDelay > 0 {
log.Infof("Waiting %v before releasing lock", dsl.releaseDelay)
time.Sleep(dsl.releaseDelay)
}
for {
ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout)
if err != nil {
@@ -307,28 +406,3 @@ func (dsl *DaemonSetLock) ReleaseMultiple() error {
return nil
}
}
// GetDaemonSet returns the named DaemonSet resource from the DaemonSetLock's configured client
func (dsl *DaemonSetLock) GetDaemonSet(sleep, timeout time.Duration) (*v1.DaemonSet, error) {
var ds *v1.DaemonSet
var lastError error
err := wait.PollImmediate(sleep, timeout, func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if ds, lastError = dsl.client.AppsV1().DaemonSets(dsl.namespace).Get(ctx, dsl.name, metav1.GetOptions{}); lastError != nil {
return false, nil
}
return true, nil
})
if err != nil {
return nil, fmt.Errorf("Timed out trying to get daemonset %s in namespace %s: %v", dsl.name, dsl.namespace, lastError)
}
return ds, nil
}
func ttlExpired(created time.Time, ttl time.Duration) bool {
if ttl > 0 && time.Since(created) >= ttl {
return true
}
return false
}

View File

@@ -66,7 +66,7 @@ func TestCanAcquireMultiple(t *testing.T) {
current: multiLockAnnotationValue{},
desired: multiLockAnnotationValue{
MaxOwners: 2,
LockAnnotations: []lockAnnotationValue{
LockAnnotations: []LockAnnotationValue{
{NodeID: node1Name},
},
},
@@ -80,13 +80,13 @@ func TestCanAcquireMultiple(t *testing.T) {
maxOwners: 2,
current: multiLockAnnotationValue{
MaxOwners: 2,
LockAnnotations: []lockAnnotationValue{
LockAnnotations: []LockAnnotationValue{
{NodeID: node2Name},
},
},
desired: multiLockAnnotationValue{
MaxOwners: 2,
LockAnnotations: []lockAnnotationValue{
LockAnnotations: []LockAnnotationValue{
{NodeID: node1Name},
{NodeID: node2Name},
},
@@ -101,7 +101,7 @@ func TestCanAcquireMultiple(t *testing.T) {
maxOwners: 2,
current: multiLockAnnotationValue{
MaxOwners: 2,
LockAnnotations: []lockAnnotationValue{
LockAnnotations: []LockAnnotationValue{
{
NodeID: node2Name,
Created: time.Now().UTC().Add(-1 * time.Minute),
@@ -116,7 +116,7 @@ func TestCanAcquireMultiple(t *testing.T) {
},
desired: multiLockAnnotationValue{
MaxOwners: 2,
LockAnnotations: []lockAnnotationValue{
LockAnnotations: []LockAnnotationValue{
{NodeID: node2Name},
{NodeID: node3Name},
},
@@ -131,7 +131,7 @@ func TestCanAcquireMultiple(t *testing.T) {
maxOwners: 2,
current: multiLockAnnotationValue{
MaxOwners: 2,
LockAnnotations: []lockAnnotationValue{
LockAnnotations: []LockAnnotationValue{
{
NodeID: node2Name,
Created: time.Now().UTC().Add(-1 * time.Hour),
@@ -146,7 +146,7 @@ func TestCanAcquireMultiple(t *testing.T) {
},
desired: multiLockAnnotationValue{
MaxOwners: 2,
LockAnnotations: []lockAnnotationValue{
LockAnnotations: []LockAnnotationValue{
{NodeID: node1Name},
{NodeID: node3Name},
},
@@ -161,7 +161,7 @@ func TestCanAcquireMultiple(t *testing.T) {
maxOwners: 2,
current: multiLockAnnotationValue{
MaxOwners: 2,
LockAnnotations: []lockAnnotationValue{
LockAnnotations: []LockAnnotationValue{
{
NodeID: node2Name,
Created: time.Now().UTC().Add(-1 * time.Hour),
@@ -176,17 +176,17 @@ func TestCanAcquireMultiple(t *testing.T) {
},
desired: multiLockAnnotationValue{
MaxOwners: 2,
LockAnnotations: []lockAnnotationValue{
LockAnnotations: []LockAnnotationValue{
{NodeID: node1Name},
},
},
lockPossible: true,
},
}
nm := NodeMeta{Unschedulable: false}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
lockPossible, actual := testCase.daemonSetLock.canAcquireMultiple(testCase.current, struct{}{}, time.Minute, testCase.maxOwners)
lockPossible, actual := testCase.daemonSetLock.canAcquireMultiple(testCase.current, nm, time.Minute, testCase.maxOwners)
if lockPossible != testCase.lockPossible {
t.Fatalf(
"unexpected result for lock possible (got %t expected %t new annotation %v",

View File

@@ -1,25 +1,37 @@
package reboot
import (
"fmt"
"github.com/google/shlex"
"github.com/kubereboot/kured/pkg/util"
log "github.com/sirupsen/logrus"
)
// CommandRebootMethod holds context-information for a command reboot.
type CommandRebootMethod struct {
nodeID string
rebootCommand []string
// CommandRebooter holds context-information for a reboot with command
type CommandRebooter struct {
RebootCommand []string
}
// NewCommandReboot creates a new command-rebooter which needs full privileges on the host.
func NewCommandReboot(nodeID string, rebootCommand []string) *CommandRebootMethod {
return &CommandRebootMethod{nodeID: nodeID, rebootCommand: rebootCommand}
}
// Reboot triggers the command-reboot.
func (c *CommandRebootMethod) Reboot() {
log.Infof("Running command: %s for node: %s", c.rebootCommand, c.nodeID)
if err := util.NewCommand(c.rebootCommand[0], c.rebootCommand[1:]...).Run(); err != nil {
log.Fatalf("Error invoking reboot command: %v", err)
// Reboot triggers the reboot command
func (c CommandRebooter) Reboot() error {
log.Infof("Invoking command: %s", c.RebootCommand)
if err := util.NewCommand(c.RebootCommand[0], c.RebootCommand[1:]...).Run(); err != nil {
return fmt.Errorf("error invoking reboot command %s: %v", c.RebootCommand, err)
}
return nil
}
// NewCommandRebooter is the constructor to create a CommandRebooter from a string not
// yet shell lexed. You can skip this constructor if you parse the data correctly first
// when instantiating a CommandRebooter instance.
func NewCommandRebooter(rebootCommand string) (*CommandRebooter, error) {
if rebootCommand == "" {
return nil, fmt.Errorf("no reboot command specified")
}
cmd, err := shlex.Split(rebootCommand)
if err != nil {
return nil, fmt.Errorf("error %v when parsing reboot command %s", err, rebootCommand)
}
return &CommandRebooter{RebootCommand: util.PrivilegedHostCommand(1, cmd)}, nil
}

View File

@@ -1,6 +1,9 @@
package reboot
// Reboot interface defines the Reboot function to be implemented.
type Reboot interface {
Reboot()
// Rebooter is the standard interface to use to execute
// the reboot, after it has been considered as necessary.
// The Reboot method does not expect any return, yet should
// most likely be refactored in the future to return an error
type Rebooter interface {
Reboot() error
}

View File

@@ -1,34 +1,37 @@
package reboot
import (
"fmt"
"os"
"syscall"
log "github.com/sirupsen/logrus"
)
// SignalRebootMethod holds context-information for a signal reboot.
type SignalRebootMethod struct {
nodeID string
signal int
// SignalRebooter holds context-information for a signal reboot.
type SignalRebooter struct {
Signal int
}
// NewSignalReboot creates a new signal-rebooter which can run unprivileged.
func NewSignalReboot(nodeID string, signal int) *SignalRebootMethod {
return &SignalRebootMethod{nodeID: nodeID, signal: signal}
}
// Reboot triggers the signal-reboot.
func (c *SignalRebootMethod) Reboot() {
log.Infof("Emit reboot-signal for node: %s", c.nodeID)
// Reboot triggers the reboot signal
func (c SignalRebooter) Reboot() error {
process, err := os.FindProcess(1)
if err != nil {
log.Fatalf("There was no systemd process found: %v", err)
return fmt.Errorf("not running on Unix: %v", err)
}
err = process.Signal(syscall.Signal(c.signal))
err = process.Signal(syscall.Signal(c.Signal))
// Either PID does not exist, or the signal does not work. Hoping for
// a decent enough error.
if err != nil {
log.Fatalf("Signal of SIGRTMIN+5 failed: %v", err)
return fmt.Errorf("signal of SIGRTMIN+5 failed: %v", err)
}
return nil
}
// NewSignalRebooter is the constructor which sets the signal number.
// The constructor does not yet validate any input. It should be done in a later commit.
func NewSignalRebooter(sig int) (*SignalRebooter, error) {
if sig < 1 {
return nil, fmt.Errorf("invalid signal: %v", sig)
}
return &SignalRebooter{Signal: sig}, nil
}

View File

@@ -1,6 +1,7 @@
package util
import (
"fmt"
"os/exec"
log "github.com/sirupsen/logrus"
@@ -21,3 +22,14 @@ func NewCommand(name string, arg ...string) *exec.Cmd {
return cmd
}
// PrivilegedHostCommand wraps the command with nsenter.
// It allows to run a command from systemd's namespace for example (pid 1)
// This relies on hostPID:true and privileged:true to enter host mount space
// For info, rancher based need different pid, which should be user given.
// until we have a better discovery mechanism.
func PrivilegedHostCommand(pid int, command []string) []string {
cmd := []string{"/usr/bin/nsenter", fmt.Sprintf("-m/proc/%d/ns/mnt", pid), "--"}
cmd = append(cmd, command...)
return cmd
}

31
pkg/util/util_test.go Normal file
View File

@@ -0,0 +1,31 @@
package util
import (
"reflect"
"testing"
)
func Test_buildHostCommand(t *testing.T) {
type args struct {
pid int
command []string
}
tests := []struct {
name string
args args
want []string
}{
{
name: "Ensure command will run with nsenter",
args: args{pid: 1, command: []string{"ls", "-Fal"}},
want: []string{"/usr/bin/nsenter", "-m/proc/1/ns/mnt", "--", "ls", "-Fal"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := PrivilegedHostCommand(tt.args.pid, tt.args.command); !reflect.DeepEqual(got, tt.want) {
t.Errorf("buildHostCommand() = %v, want %v", got, tt.want)
}
})
}
}