mirror of
https://github.com/kubereboot/kured.git
synced 2026-05-25 09:43:28 +00:00
171 lines
5.5 KiB
Go
171 lines
5.5 KiB
Go
package daemonsetlock
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
v1 "k8s.io/api/apps/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/kubernetes"
|
|
)
|
|
|
|
const (
|
|
k8sAPICallRetrySleep = 5 * time.Second // How much time to wait in between retrying a k8s API call
|
|
k8sAPICallRetryTimeout = 5 * time.Minute // How long to wait until we determine that the k8s API is definitively unavailable
|
|
)
|
|
|
|
// DaemonSetLock holds all necessary information to do actions
|
|
// on the kured ds which holds lock info through annotations.
|
|
type DaemonSetLock struct {
|
|
client *kubernetes.Clientset
|
|
nodeID string
|
|
namespace string
|
|
name string
|
|
annotation string
|
|
}
|
|
|
|
type lockAnnotationValue struct {
|
|
NodeID string `json:"nodeID"`
|
|
Metadata interface{} `json:"metadata,omitempty"`
|
|
Created time.Time `json:"created"`
|
|
TTL time.Duration `json:"TTL"`
|
|
}
|
|
|
|
// New creates a daemonsetLock object containing the necessary data for follow up k8s requests
|
|
func New(client *kubernetes.Clientset, nodeID, namespace, name, annotation string) *DaemonSetLock {
|
|
return &DaemonSetLock{client, nodeID, namespace, name, annotation}
|
|
}
|
|
|
|
// Acquire attempts to annotate the kured daemonset with lock info from instantiated DaemonSetLock using client-go
|
|
func (dsl *DaemonSetLock) Acquire(metadata interface{}, TTL time.Duration) (bool, string, error) {
|
|
for {
|
|
ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout)
|
|
if err != nil {
|
|
return false, "", fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err)
|
|
}
|
|
|
|
valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation]
|
|
if exists {
|
|
value := lockAnnotationValue{}
|
|
if err := json.Unmarshal([]byte(valueString), &value); err != nil {
|
|
return false, "", err
|
|
}
|
|
|
|
if !ttlExpired(value.Created, value.TTL) {
|
|
return value.NodeID == dsl.nodeID, value.NodeID, nil
|
|
}
|
|
}
|
|
|
|
if ds.ObjectMeta.Annotations == nil {
|
|
ds.ObjectMeta.Annotations = make(map[string]string)
|
|
}
|
|
value := lockAnnotationValue{NodeID: dsl.nodeID, Metadata: metadata, Created: time.Now().UTC(), TTL: TTL}
|
|
valueBytes, err := json.Marshal(&value)
|
|
if err != nil {
|
|
return false, "", err
|
|
}
|
|
ds.ObjectMeta.Annotations[dsl.annotation] = string(valueBytes)
|
|
|
|
_, err = dsl.client.AppsV1().DaemonSets(dsl.namespace).Update(context.TODO(), ds, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
if se, ok := err.(*errors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict {
|
|
// Something else updated the resource between us reading and writing - try again soon
|
|
time.Sleep(time.Second)
|
|
continue
|
|
} else {
|
|
return false, "", err
|
|
}
|
|
}
|
|
return true, dsl.nodeID, nil
|
|
}
|
|
}
|
|
|
|
// Test attempts to check the kured daemonset lock status (existence, expiry) from instantiated DaemonSetLock using client-go
|
|
func (dsl *DaemonSetLock) Test(metadata interface{}) (bool, error) {
|
|
ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout)
|
|
if err != nil {
|
|
return false, fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err)
|
|
}
|
|
|
|
valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation]
|
|
if exists {
|
|
value := lockAnnotationValue{Metadata: metadata}
|
|
if err := json.Unmarshal([]byte(valueString), &value); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if !ttlExpired(value.Created, value.TTL) {
|
|
return value.NodeID == dsl.nodeID, nil
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// Release attempts to remove the lock data from the kured ds annotations using client-go
|
|
func (dsl *DaemonSetLock) Release() error {
|
|
for {
|
|
ds, err := dsl.GetDaemonSet(k8sAPICallRetrySleep, k8sAPICallRetryTimeout)
|
|
if err != nil {
|
|
return fmt.Errorf("timed out trying to get daemonset %s in namespace %s: %w", dsl.name, dsl.namespace, err)
|
|
}
|
|
|
|
valueString, exists := ds.ObjectMeta.Annotations[dsl.annotation]
|
|
if exists {
|
|
value := lockAnnotationValue{}
|
|
if err := json.Unmarshal([]byte(valueString), &value); err != nil {
|
|
return err
|
|
}
|
|
|
|
if value.NodeID != dsl.nodeID {
|
|
return fmt.Errorf("Not lock holder: %v", value.NodeID)
|
|
}
|
|
} else {
|
|
return fmt.Errorf("Lock not held")
|
|
}
|
|
|
|
delete(ds.ObjectMeta.Annotations, dsl.annotation)
|
|
|
|
_, err = dsl.client.AppsV1().DaemonSets(dsl.namespace).Update(context.TODO(), ds, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
if se, ok := err.(*errors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict {
|
|
// Something else updated the resource between us reading and writing - try again soon
|
|
time.Sleep(time.Second)
|
|
continue
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// GetDaemonSet returns the named DaemonSet resource from the DaemonSetLock's configured client
|
|
func (dsl *DaemonSetLock) GetDaemonSet(sleep, timeout time.Duration) (*v1.DaemonSet, error) {
|
|
var ds *v1.DaemonSet
|
|
var lastError error
|
|
err := wait.PollImmediate(sleep, timeout, func() (bool, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
if ds, lastError = dsl.client.AppsV1().DaemonSets(dsl.namespace).Get(ctx, dsl.name, metav1.GetOptions{}); lastError != nil {
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Timed out trying to get daemonset %s in namespace %s: %v", dsl.name, dsl.namespace, lastError)
|
|
}
|
|
return ds, nil
|
|
}
|
|
|
|
func ttlExpired(created time.Time, ttl time.Duration) bool {
|
|
if ttl > 0 && time.Since(created) >= ttl {
|
|
return true
|
|
}
|
|
return false
|
|
}
|