support in wokerpool in host sensor

This commit is contained in:
Moshe-Rappaport-CA
2022-06-22 18:40:00 +03:00
parent a5007df1bc
commit 120677a91f
3 changed files with 134 additions and 23 deletions

View File

@@ -35,6 +35,7 @@ type HostSensorHandler struct {
DaemonSet *appsv1.DaemonSet
podListLock sync.RWMutex
gracePeriod int64
workerPool workerPool
}
func NewHostSensorHandler(k8sObj *k8sinterface.KubernetesApi, hostSensorYAMLFile string) (*HostSensorHandler, error) {
@@ -54,6 +55,7 @@ func NewHostSensorHandler(k8sObj *k8sinterface.KubernetesApi, hostSensorYAMLFile
HostSensorPodNames: map[string]string{},
HostSensorUnscheduledPodNames: map[string]string{},
gracePeriod: int64(15),
workerPool: NewWorkerPool(),
}
// Don't deploy on cluster with no nodes. Some cloud providers prevents termination of K8s objects for cluster with no nodes!!!
if nodeList, err := k8sObj.KubernetesClient.CoreV1().Nodes().List(k8sObj.Context, metav1.ListOptions{}); err != nil || len(nodeList.Items) == 0 {

View File

@@ -32,7 +32,23 @@ func (hsh *HostSensorHandler) HTTPGetToPod(podName, path string) ([]byte, error)
restProxy := hsh.k8sObj.KubernetesClient.CoreV1().Pods(hsh.DaemonSet.Namespace).ProxyGet("http", podName, fmt.Sprintf("%d", hsh.HostSensorPort), path, map[string]string{})
return restProxy.DoRaw(hsh.k8sObj.Context)
}
func (hsh *HostSensorHandler) getResourcesFromPod(podName, nodeName, resourceKind, path string) (hostsensor.HostSensorDataEnvelope, error) {
// send the request and pack the response as an hostSensorDataEnvelope
resBytes, err := hsh.HTTPGetToPod(podName, path)
if err != nil {
return hostsensor.HostSensorDataEnvelope{}, err
}
hostSensorDataEnvelope := hostsensor.HostSensorDataEnvelope{}
hostSensorDataEnvelope.SetApiVersion(k8sinterface.JoinGroupVersion(hostsensor.GroupHostSensor, hostsensor.Version))
hostSensorDataEnvelope.SetKind(resourceKind)
hostSensorDataEnvelope.SetName(nodeName)
hostSensorDataEnvelope.SetData(resBytes)
return hostSensorDataEnvelope, nil
}
func (hsh *HostSensorHandler) ForwardToPod(podName, path string) ([]byte, error) {
@@ -59,35 +75,26 @@ func (hsh *HostSensorHandler) ForwardToPod(podName, path string) ([]byte, error)
// sendAllPodsHTTPGETRequest fills the raw byte response in the envelope and the node name, but not the GroupVersionKind
// so the caller is responsible to convert the raw data to some structured data and add the GroupVersionKind details
//
// The function produces a worker-pool with a fixed number of workers.
// For each node the request is pushed to the jobs channel, the worker sends the request and pushes the result to the result channel.
// When all workers have finished, the function returns a list of results
func (hsh *HostSensorHandler) sendAllPodsHTTPGETRequest(path, requestKind string) ([]hostsensor.HostSensorDataEnvelope, error) {
podList, err := hsh.getPodList()
if err != nil {
return nil, fmt.Errorf("failed to sendAllPodsHTTPGETRequest: %v", err)
}
res := make([]hostsensor.HostSensorDataEnvelope, 0, len(podList))
resLock := sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(len(podList))
for podName := range podList {
go func(podName, path string) {
defer wg.Done()
resBytes, err := hsh.HTTPGetToPod(podName, path)
if err != nil {
logger.L().Error("failed to get data", helpers.String("path", path), helpers.String("podName", podName), helpers.Error(err))
} else {
resLock.Lock()
defer resLock.Unlock()
hostSensorDataEnvelope := hostsensor.HostSensorDataEnvelope{}
hostSensorDataEnvelope.SetApiVersion(k8sinterface.JoinGroupVersion(hostsensor.GroupHostSensor, hostsensor.Version))
hostSensorDataEnvelope.SetKind(requestKind)
hostSensorDataEnvelope.SetName(podList[podName])
hostSensorDataEnvelope.SetData(resBytes)
res = append(res, hostSensorDataEnvelope)
}
}(podName, path)
}
wg.Wait()
res := make([]hostsensor.HostSensorDataEnvelope, 0, len(podList))
var wg sync.WaitGroup
// initialization of the channels
hsh.workerPool.init()
hsh.workerPool.hostSensorApplyJobs(podList, path, requestKind)
hsh.workerPool.hostSensorGetResults(&res)
hsh.workerPool.createWorkerPool(hsh, &wg)
hsh.workerPool.waitForDone(&wg)
return res, nil
}

View File

@@ -0,0 +1,102 @@
package hostsensorutils
import (
"sync"
"github.com/armosec/kubescape/v2/core/cautils/logger"
"github.com/armosec/kubescape/v2/core/cautils/logger/helpers"
"github.com/armosec/opa-utils/objectsenvelopes/hostsensor"
)
const noOfWorkers int = 10
type Job struct {
podName string
nodeName string
requestKind string
path string
}
type workerPool struct {
jobs chan Job
results chan hostsensor.HostSensorDataEnvelope
Done chan bool
}
func NewWorkerPool() workerPool {
wp := workerPool{}
wp.init()
return wp
}
func (wp *workerPool) init() {
// init the channels, if they are open then close first
if _, open := <-wp.jobs; open {
close(wp.jobs)
}
wp.jobs = make(chan Job, noOfWorkers)
if _, open := <-wp.results; open {
close(wp.results)
}
wp.results = make(chan hostsensor.HostSensorDataEnvelope, noOfWorkers)
if _, open := <-wp.Done; open {
close(wp.Done)
}
wp.Done = make(chan bool)
}
// The worker takes a job out of the chan, executes the request, and pushes the result to the results chan
func (wp *workerPool) hostSensorWorker(hsh *HostSensorHandler, wg *sync.WaitGroup) {
defer wg.Done()
for job := range wp.jobs {
hostSensorDataEnvelope, err := hsh.getResourcesFromPod(job.podName, job.nodeName, job.requestKind, job.path)
if err != nil {
logger.L().Error("failed to get data", helpers.String("path", job.path), helpers.String("podName", job.podName), helpers.Error(err))
} else {
wp.results <- hostSensorDataEnvelope
}
}
}
func (wp *workerPool) createWorkerPool(hsh *HostSensorHandler, wg *sync.WaitGroup) {
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go wp.hostSensorWorker(hsh, wg)
}
}
func (wp *workerPool) waitForDone(wg *sync.WaitGroup) {
// Waiting for workers to finish
wg.Wait()
close(wp.results)
// Waiting for the results to be processed
<-wp.Done
}
func (wp *workerPool) hostSensorGetResults(result *[]hostsensor.HostSensorDataEnvelope) {
go func() {
for res := range wp.results {
*result = append(*result, res)
}
wp.Done <- true
}()
}
func (wp *workerPool) hostSensorApplyJobs(podList map[string]string, path, requestKind string) {
go func() {
for podName, nodeName := range podList {
job := Job{
podName: podName,
nodeName: nodeName,
requestKind: requestKind,
path: path,
}
wp.jobs <- job
}
close(wp.jobs)
}()
}