Files
kubescape/core/pkg/hostsensorutils/hostsensordeploy.go
Alessio Greggi c486b4fed7 feat: add log coupling for hostsensorutils
Signed-off-by: Alessio Greggi <ale_grey_91@hotmail.it>
2023-05-24 14:46:34 +02:00

451 lines
14 KiB
Go

package hostsensorutils
import (
"context"
_ "embed"
"fmt"
"os"
"sync"
"time"
logger "github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/k8s-interface/k8sinterface"
"github.com/kubescape/k8s-interface/workloadinterface"
"github.com/kubescape/kubescape/v2/core/cautils"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
)
var (
//go:embed hostsensor.yaml
hostSensorYAML string
namespaceWasPresent bool
)
const portName string = "scanner"
// HostSensorHandler is a client that interacts with a host-scanner component deployed on nodes.
//
// The API exposed by the host sensor is defined here: https://github.com/kubescape/host-scanner
type HostSensorHandler struct {
hostSensorPort int32
hostSensorPodNames map[string]string //map from pod names to node names
hostSensorUnscheduledPodNames map[string]string //map from pod names to node names
k8sObj *k8sinterface.KubernetesApi
daemonSet *appsv1.DaemonSet
podListLock sync.RWMutex
gracePeriod int64
workerPool workerPool
}
// NewHostSensorHandler builds a new http client to the host-scanner API.
func NewHostSensorHandler(k8sObj *k8sinterface.KubernetesApi, hostSensorYAMLFile string) (*HostSensorHandler, error) {
if k8sObj == nil {
return nil, fmt.Errorf("nil k8s interface received")
}
if hostSensorYAMLFile != "" {
d, err := loadHostSensorFromFile(hostSensorYAMLFile)
if err != nil {
return nil, fmt.Errorf("failed to load host-scanner yaml file, reason: %w", err)
}
hostSensorYAML = d
}
hsh := &HostSensorHandler{
k8sObj: k8sObj,
hostSensorPodNames: map[string]string{},
hostSensorUnscheduledPodNames: map[string]string{},
gracePeriod: int64(15),
workerPool: newWorkerPool(),
}
// Don't deploy on a cluster with no nodes. Some cloud providers prevent the termination of K8s objects for cluster with no nodes!!!
if nodeList, err := k8sObj.KubernetesClient.CoreV1().Nodes().List(k8sObj.Context, metav1.ListOptions{}); err != nil || len(nodeList.Items) == 0 {
if err == nil {
err = fmt.Errorf("no nodes to scan")
}
return hsh, fmt.Errorf("in NewHostSensorHandler, failed to get nodes list: %v", err)
}
return hsh, nil
}
// Init deploys the host-scanner and start watching the pods on the host.
func (hsh *HostSensorHandler) Init(ctx context.Context) error {
// deploy the YAML
// store namespace + port
// store pod names
// make sure all pods are running, after X seconds treat has running anyway, and log an error on the pods not running yet
logger.L().Info("Installing host scanner")
logger.L().Debug("The host scanner is a DaemonSet that runs on each node in the cluster. The DaemonSet will be running in it's own Namespace and will be deleted once the scan is completed. If you do not wish to install the host scanner, please run the scan without the --enable-host-scan flag.")
// log is used to avoid log duplication
// coming from the different host-scanner instances
log := NewLogCoupling()
cautils.StartSpinner()
defer cautils.StopSpinner()
if err := hsh.applyYAML(ctx); err != nil {
return fmt.Errorf("failed to apply host scanner YAML, reason: %v", err)
}
hsh.populatePodNamesToNodeNames(ctx, log)
if err := hsh.checkPodForEachNode(); err != nil {
logger.L().Ctx(ctx).Warning(failedToValidateHostSensorPodStatus, helpers.Error(err))
}
return nil
}
// checkNamespaceWasPresent check if the given namespace was already present on kubernetes and in "Active" state.
// Return true in case it find the namespace on the list, false otherwise.
// In case we have some error with the kubernetes APIs, it returns an error.
func (hsh *HostSensorHandler) checkNamespaceWasPresent(namespace string) bool {
ns, err := hsh.k8sObj.KubernetesClient.
CoreV1().
Namespaces().
Get(hsh.k8sObj.Context, namespace, metav1.GetOptions{})
if err != nil {
return false
}
// check also if it is in "Active" state.
if ns.Status.Phase != corev1.NamespaceActive {
return false
}
return true
}
// namespaceWasPresent return the namespaceWasPresent variable value.
func (hsh *HostSensorHandler) namespaceWasPresent() bool {
return namespaceWasPresent
}
func (hsh *HostSensorHandler) applyYAML(ctx context.Context) error {
workloads, err := cautils.ReadFile([]byte(hostSensorYAML), cautils.YAML_FILE_FORMAT)
if err != nil {
return fmt.Errorf("failed to read YAML files, reason: %v", err)
}
// Get namespace name
namespaceName := ""
for i := range workloads {
if workloads[i].GetKind() == "Namespace" {
namespaceName = workloads[i].GetName()
break
}
}
// check if namespace was already present on kubernetes
namespaceWasPresent = hsh.checkNamespaceWasPresent(namespaceName)
// Update workload data before applying
for i := range workloads {
w := workloadinterface.NewWorkloadObj(workloads[i].GetObject())
if w == nil {
return fmt.Errorf("invalid workload: %v", workloads[i].GetObject())
}
// set namespace in all objects
if w.GetKind() != "Namespace" {
w.SetNamespace(namespaceName)
}
// Get container port
if w.GetKind() == "DaemonSet" {
containers, err := w.GetContainers()
if err != nil {
if erra := hsh.tearDownNamespace(namespaceName); erra != nil {
logger.L().Ctx(ctx).Warning(failedToTeardownNamespace, helpers.Error(erra))
}
return fmt.Errorf("container not found in DaemonSet: %v", err)
}
for j := range containers {
for k := range containers[j].Ports {
if containers[j].Ports[k].Name == portName {
hsh.hostSensorPort = containers[j].Ports[k].ContainerPort
}
}
}
}
// Apply workload
var newWorkload k8sinterface.IWorkload
var e error
if g, err := hsh.k8sObj.GetWorkload(w.GetNamespace(), w.GetKind(), w.GetName()); err == nil && g != nil {
newWorkload, e = hsh.k8sObj.UpdateWorkload(w)
} else {
newWorkload, e = hsh.k8sObj.CreateWorkload(w)
}
if e != nil {
if erra := hsh.tearDownNamespace(namespaceName); erra != nil {
logger.L().Ctx(ctx).Warning(failedToTeardownNamespace, helpers.Error(erra))
}
return fmt.Errorf("failed to create/update YAML, reason: %v", e)
}
// Save DaemonSet
if newWorkload.GetKind() == "DaemonSet" {
b, err := json.Marshal(newWorkload.GetObject())
if err != nil {
if erra := hsh.tearDownNamespace(namespaceName); erra != nil {
logger.L().Ctx(ctx).Warning(failedToTeardownNamespace, helpers.Error(erra))
}
return fmt.Errorf("failed to Marshal YAML of DaemonSet, reason: %v", err)
}
var ds appsv1.DaemonSet
if err := json.Unmarshal(b, &ds); err != nil {
if erra := hsh.tearDownNamespace(namespaceName); erra != nil {
logger.L().Ctx(ctx).Warning(failedToTeardownNamespace, helpers.Error(erra))
}
return fmt.Errorf("failed to Unmarshal YAML of DaemonSet, reason: %v", err)
}
hsh.daemonSet = &ds
}
}
return nil
}
func (hsh *HostSensorHandler) checkPodForEachNode() error {
deadline := time.Now().Add(time.Second * 100)
for {
nodesList, err := hsh.k8sObj.KubernetesClient.CoreV1().Nodes().List(hsh.k8sObj.Context, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("in checkPodsForEveryNode, failed to get nodes list: %v", nodesList)
}
hsh.podListLock.RLock()
podsNum := len(hsh.hostSensorPodNames)
unschedPodNum := len(hsh.hostSensorUnscheduledPodNames)
hsh.podListLock.RUnlock()
if len(nodesList.Items) <= podsNum+unschedPodNum {
break
}
if time.Now().After(deadline) {
hsh.podListLock.RLock()
podsMap := hsh.hostSensorPodNames
hsh.podListLock.RUnlock()
return fmt.Errorf("host-scanner pods number (%d) differ than nodes number (%d) after deadline exceeded. Kubescape will take data only from the pods below: %v",
podsNum, len(nodesList.Items), podsMap)
}
time.Sleep(100 * time.Millisecond)
}
return nil
}
// initiating routine to keep pod list updated
func (hsh *HostSensorHandler) populatePodNamesToNodeNames(ctx context.Context, log *LogsMap) {
go func() {
var watchRes watch.Interface
var err error
watchRes, err = hsh.k8sObj.KubernetesClient.CoreV1().Pods(hsh.daemonSet.Namespace).Watch(hsh.k8sObj.Context, metav1.ListOptions{
Watch: true,
LabelSelector: fmt.Sprintf("name=%s", hsh.daemonSet.Spec.Template.Labels["name"]),
})
if err != nil {
logger.L().Ctx(ctx).Warning(failedToWatchOverDaemonSetPods, helpers.Error(err))
}
if watchRes == nil {
logger.L().Ctx(ctx).Error("failed to watch over DaemonSet pods, will not be able to get host-scanner data")
return
}
for eve := range watchRes.ResultChan() {
pod, ok := eve.Object.(*corev1.Pod)
if !ok {
continue
}
go hsh.updatePodInListAtomic(ctx, eve.Type, pod, log)
}
}()
}
func (hsh *HostSensorHandler) updatePodInListAtomic(ctx context.Context, eventType watch.EventType, podObj *corev1.Pod, log *LogsMap) {
hsh.podListLock.Lock()
defer hsh.podListLock.Unlock()
switch eventType {
case watch.Added, watch.Modified:
if podObj.Status.Phase == corev1.PodRunning && len(podObj.Status.ContainerStatuses) > 0 &&
podObj.Status.ContainerStatuses[0].Ready {
hsh.hostSensorPodNames[podObj.ObjectMeta.Name] = podObj.Spec.NodeName
delete(hsh.hostSensorUnscheduledPodNames, podObj.ObjectMeta.Name)
} else {
if podObj.Status.Phase == corev1.PodPending && len(podObj.Status.Conditions) > 0 &&
podObj.Status.Conditions[0].Reason == corev1.PodReasonUnschedulable {
nodeName := ""
if podObj.Spec.Affinity != nil && podObj.Spec.Affinity.NodeAffinity != nil &&
podObj.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil &&
len(podObj.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) > 0 &&
len(podObj.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchFields) > 0 &&
len(podObj.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchFields[0].Values) > 0 {
nodeName = podObj.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchFields[0].Values[0]
}
if !log.isDuplicated(oneHostSensorPodIsUnabledToSchedule) {
logger.L().Ctx(ctx).Warning(oneHostSensorPodIsUnabledToSchedule,
helpers.String("message", podObj.Status.Conditions[0].Message))
log.update(oneHostSensorPodIsUnabledToSchedule)
}
if nodeName != "" {
hsh.hostSensorUnscheduledPodNames[podObj.ObjectMeta.Name] = nodeName
}
} else {
delete(hsh.hostSensorPodNames, podObj.ObjectMeta.Name)
}
}
default:
delete(hsh.hostSensorPodNames, podObj.ObjectMeta.Name)
}
}
// tearDownNamespace manage the host-scanner deletion.
func (hsh *HostSensorHandler) tearDownHostScanner(namespace string) error {
client := hsh.k8sObj.KubernetesClient
// delete host-scanner DaemonSet
err := client.AppsV1().
DaemonSets(namespace).
Delete(
hsh.k8sObj.Context,
hsh.daemonSet.Name,
metav1.DeleteOptions{
GracePeriodSeconds: &hsh.gracePeriod,
},
)
if err != nil {
return fmt.Errorf("failed to delete host-scanner DaemonSet: %v", err)
}
// wait for DaemonSet to be deleted
err = hsh.waitHostScannerDeleted(hsh.k8sObj.Context)
if err != nil {
return fmt.Errorf("failed to delete host-scanner DaemonSet: %v", err)
}
return nil
}
// tearDownNamespace manage the given namespace deletion.
// At first, it checks if the namespace was already present before installing host-scanner.
// In that case skips the deletion.
// If was not, then patches the namespace in order to remove the finalizers,
// and finally delete the it.
func (hsh *HostSensorHandler) tearDownNamespace(namespace string) error {
// if namespace was already present on kubernetes (before installing host-scanner),
// then we shouldn't delete it.
if hsh.namespaceWasPresent() {
return nil
}
// to make it more readable we store the object client in a variable
client := hsh.k8sObj.KubernetesClient
// prepare patch json to remove finalizers from namespace
patchData := `
[
{
"op": "replace",
"path": "/metadata/finalizers",
"value": []
}
]
`
// patch namespace object removing finalizers
_, err := client.CoreV1().
Namespaces().
Patch(
hsh.k8sObj.Context,
namespace,
types.JSONPatchType,
[]byte(patchData),
metav1.PatchOptions{},
)
if err != nil {
return fmt.Errorf("failed to remove finalizers from Namespace: %v", err)
}
// patch namespace object removing finalizers
// delete namespace object
err = client.CoreV1().
Namespaces().
Delete(
hsh.k8sObj.Context,
namespace,
metav1.DeleteOptions{
GracePeriodSeconds: &hsh.gracePeriod,
},
)
if err != nil {
return fmt.Errorf("failed to delete %s Namespace: %v", namespace, err)
}
return nil
}
func (hsh *HostSensorHandler) TearDown() error {
namespace := hsh.GetNamespace()
// delete DaemonSet
if err := hsh.tearDownHostScanner(namespace); err != nil {
return fmt.Errorf("failed to delete host-scanner DaemonSet: %v", err)
}
// delete Namespace
if err := hsh.tearDownNamespace(namespace); err != nil {
return fmt.Errorf("failed to delete host-scanner Namespace: %v", err)
}
return nil
}
func (hsh *HostSensorHandler) GetNamespace() string {
if hsh.daemonSet == nil {
return ""
}
return hsh.daemonSet.Namespace
}
func loadHostSensorFromFile(hostSensorYAMLFile string) (string, error) {
dat, err := os.ReadFile(hostSensorYAMLFile)
if err != nil {
return "", err
}
// TODO - Add file validation
return string(dat), err
}
// waitHostScannerDeleted watch for host-scanner deletion.
// In case it fails it returns an error.
func (hsh *HostSensorHandler) waitHostScannerDeleted(ctx context.Context) error {
labelSelector := fmt.Sprintf("name=%s", hsh.daemonSet.Name)
opts := metav1.ListOptions{
TypeMeta: metav1.TypeMeta{},
LabelSelector: labelSelector,
FieldSelector: "",
}
watcher, err := hsh.k8sObj.KubernetesClient.CoreV1().
Pods(hsh.daemonSet.Namespace).
Watch(ctx, opts)
if err != nil {
return err
}
defer watcher.Stop()
for {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Deleted {
return nil
}
case <-ctx.Done():
return nil
}
}
}