Merge pull request #555 from Moshe-Rappaport-CA/dev

Working with worker pool in host sensor
This commit is contained in:
David Wertenteil
2022-07-07 11:30:45 +03:00
committed by GitHub
11 changed files with 183 additions and 34 deletions

View File

@@ -62,6 +62,11 @@ func NewOPASessionObjMock() *OPASessionObj {
ReportID: "",
JobID: "",
},
Metadata: &reporthandlingv2.Metadata{
ScanMetadata: reporthandlingv2.ScanMetadata{
ScanningTarget: 0,
},
},
}
}

View File

@@ -266,10 +266,23 @@ func scanInfoToScanMetadata(scanInfo *ScanInfo) *reporthandlingv2.Metadata {
if len(scanInfo.InputPatterns) > 0 {
inputFiles = scanInfo.InputPatterns[0]
}
metadata.ScanMetadata.ScanningTarget = reporthandlingv2.Cluster
if GetScanningContext(inputFiles) != ContextCluster {
switch GetScanningContext(inputFiles) {
case ContextCluster:
// cluster
metadata.ScanMetadata.ScanningTarget = reporthandlingv2.Cluster
case ContextFile:
// local file
metadata.ScanMetadata.ScanningTarget = reporthandlingv2.File
case ContextGitURL:
// url
metadata.ScanMetadata.ScanningTarget = reporthandlingv2.Repo
case ContextGitLocal:
// local-git
metadata.ScanMetadata.ScanningTarget = reporthandlingv2.GitLocal
case ContextDir:
// directory
metadata.ScanMetadata.ScanningTarget = reporthandlingv2.Directory
}
setContextMetadata(&metadata.ContextMetadata, inputFiles)

View File

@@ -35,6 +35,7 @@ type HostSensorHandler struct {
DaemonSet *appsv1.DaemonSet
podListLock sync.RWMutex
gracePeriod int64
workerPool workerPool
}
func NewHostSensorHandler(k8sObj *k8sinterface.KubernetesApi, hostSensorYAMLFile string) (*HostSensorHandler, error) {
@@ -54,6 +55,7 @@ func NewHostSensorHandler(k8sObj *k8sinterface.KubernetesApi, hostSensorYAMLFile
HostSensorPodNames: map[string]string{},
HostSensorUnscheduledPodNames: map[string]string{},
gracePeriod: int64(15),
workerPool: NewWorkerPool(),
}
// Don't deploy on cluster with no nodes. Some cloud providers prevents termination of K8s objects for cluster with no nodes!!!
if nodeList, err := k8sObj.KubernetesClient.CoreV1().Nodes().List(k8sObj.Context, metav1.ListOptions{}); err != nil || len(nodeList.Items) == 0 {

View File

@@ -32,7 +32,23 @@ func (hsh *HostSensorHandler) HTTPGetToPod(podName, path string) ([]byte, error)
restProxy := hsh.k8sObj.KubernetesClient.CoreV1().Pods(hsh.DaemonSet.Namespace).ProxyGet("http", podName, fmt.Sprintf("%d", hsh.HostSensorPort), path, map[string]string{})
return restProxy.DoRaw(hsh.k8sObj.Context)
}
func (hsh *HostSensorHandler) getResourcesFromPod(podName, nodeName, resourceKind, path string) (hostsensor.HostSensorDataEnvelope, error) {
// send the request and pack the response as an hostSensorDataEnvelope
resBytes, err := hsh.HTTPGetToPod(podName, path)
if err != nil {
return hostsensor.HostSensorDataEnvelope{}, err
}
hostSensorDataEnvelope := hostsensor.HostSensorDataEnvelope{}
hostSensorDataEnvelope.SetApiVersion(k8sinterface.JoinGroupVersion(hostsensor.GroupHostSensor, hostsensor.Version))
hostSensorDataEnvelope.SetKind(resourceKind)
hostSensorDataEnvelope.SetName(nodeName)
hostSensorDataEnvelope.SetData(resBytes)
return hostSensorDataEnvelope, nil
}
func (hsh *HostSensorHandler) ForwardToPod(podName, path string) ([]byte, error) {
@@ -59,35 +75,26 @@ func (hsh *HostSensorHandler) ForwardToPod(podName, path string) ([]byte, error)
// sendAllPodsHTTPGETRequest fills the raw byte response in the envelope and the node name, but not the GroupVersionKind
// so the caller is responsible to convert the raw data to some structured data and add the GroupVersionKind details
//
// The function produces a worker-pool with a fixed number of workers.
// For each node the request is pushed to the jobs channel, the worker sends the request and pushes the result to the result channel.
// When all workers have finished, the function returns a list of results
func (hsh *HostSensorHandler) sendAllPodsHTTPGETRequest(path, requestKind string) ([]hostsensor.HostSensorDataEnvelope, error) {
podList, err := hsh.getPodList()
if err != nil {
return nil, fmt.Errorf("failed to sendAllPodsHTTPGETRequest: %v", err)
}
res := make([]hostsensor.HostSensorDataEnvelope, 0, len(podList))
resLock := sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(len(podList))
for podName := range podList {
go func(podName, path string) {
defer wg.Done()
resBytes, err := hsh.HTTPGetToPod(podName, path)
if err != nil {
logger.L().Error("failed to get data", helpers.String("path", path), helpers.String("podName", podName), helpers.Error(err))
} else {
resLock.Lock()
defer resLock.Unlock()
hostSensorDataEnvelope := hostsensor.HostSensorDataEnvelope{}
hostSensorDataEnvelope.SetApiVersion(k8sinterface.JoinGroupVersion(hostsensor.GroupHostSensor, hostsensor.Version))
hostSensorDataEnvelope.SetKind(requestKind)
hostSensorDataEnvelope.SetName(podList[podName])
hostSensorDataEnvelope.SetData(resBytes)
res = append(res, hostSensorDataEnvelope)
}
}(podName, path)
}
wg.Wait()
res := make([]hostsensor.HostSensorDataEnvelope, 0, len(podList))
var wg sync.WaitGroup
// initialization of the channels
hsh.workerPool.init(len(podList))
hsh.workerPool.hostSensorApplyJobs(podList, path, requestKind)
hsh.workerPool.hostSensorGetResults(&res)
hsh.workerPool.createWorkerPool(hsh, &wg)
hsh.workerPool.waitForDone(&wg)
return res, nil
}

View File

@@ -0,0 +1,96 @@
package hostsensorutils
import (
"sync"
"github.com/armosec/kubescape/v2/core/cautils/logger"
"github.com/armosec/kubescape/v2/core/cautils/logger/helpers"
"github.com/armosec/opa-utils/objectsenvelopes/hostsensor"
)
const noOfWorkers int = 10
type job struct {
podName string
nodeName string
requestKind string
path string
}
type workerPool struct {
jobs chan job
results chan hostsensor.HostSensorDataEnvelope
done chan bool
noOfWorkers int
}
func NewWorkerPool() workerPool {
wp := workerPool{}
wp.noOfWorkers = noOfWorkers
wp.init()
return wp
}
func (wp *workerPool) init(noOfPods ...int) {
if noOfPods != nil && len(noOfPods) > 0 && noOfPods[0] < noOfWorkers {
wp.noOfWorkers = noOfPods[0]
}
// init the channels
wp.jobs = make(chan job, noOfWorkers)
wp.results = make(chan hostsensor.HostSensorDataEnvelope, noOfWorkers)
wp.done = make(chan bool)
}
// The worker takes a job out of the chan, executes the request, and pushes the result to the results chan
func (wp *workerPool) hostSensorWorker(hsh *HostSensorHandler, wg *sync.WaitGroup) {
defer wg.Done()
for job := range wp.jobs {
hostSensorDataEnvelope, err := hsh.getResourcesFromPod(job.podName, job.nodeName, job.requestKind, job.path)
if err != nil {
logger.L().Error("failed to get data", helpers.String("path", job.path), helpers.String("podName", job.podName), helpers.Error(err))
} else {
wp.results <- hostSensorDataEnvelope
}
}
}
func (wp *workerPool) createWorkerPool(hsh *HostSensorHandler, wg *sync.WaitGroup) {
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go wp.hostSensorWorker(hsh, wg)
}
}
func (wp *workerPool) waitForDone(wg *sync.WaitGroup) {
// Waiting for workers to finish
wg.Wait()
close(wp.results)
// Waiting for the results to be processed
<-wp.done
}
func (wp *workerPool) hostSensorGetResults(result *[]hostsensor.HostSensorDataEnvelope) {
go func() {
for res := range wp.results {
*result = append(*result, res)
}
wp.done <- true
}()
}
func (wp *workerPool) hostSensorApplyJobs(podList map[string]string, path, requestKind string) {
go func() {
for podName, nodeName := range podList {
job := job{
podName: podName,
nodeName: nodeName,
requestKind: requestKind,
path: path,
}
wp.jobs <- job
}
close(wp.jobs)
}()
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/armosec/k8s-interface/workloadinterface"
reporthandlingv2 "github.com/armosec/opa-utils/reporthandling/v2"
"github.com/armosec/opa-utils/resources"
"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/rego"
@@ -62,7 +63,7 @@ func (opap *OPAProcessor) ProcessRulesListenner() error {
}
func (opap *OPAProcessor) Process(policies *cautils.Policies) error {
logger.L().Info("Scanning", helpers.String("cluster", cautils.ClusterName))
opap.loggerStartScanning()
cautils.StartSpinner()
@@ -89,11 +90,30 @@ func (opap *OPAProcessor) Process(policies *cautils.Policies) error {
opap.Report.ReportGenerationTime = time.Now().UTC()
cautils.StopSpinner()
logger.L().Success("Done scanning", helpers.String("cluster", cautils.ClusterName))
opap.loggerDoneScanning()
return errs
}
func (opap *OPAProcessor) loggerStartScanning() {
targetScan := opap.OPASessionObj.Metadata.ScanMetadata.ScanningTarget
if reporthandlingv2.Cluster == targetScan {
logger.L().Info("Scanning", helpers.String(targetScan.String(), cautils.ClusterName))
} else {
logger.L().Info("Scanning " + targetScan.String())
}
}
func (opap *OPAProcessor) loggerDoneScanning() {
targetScan := opap.OPASessionObj.Metadata.ScanMetadata.ScanningTarget
if reporthandlingv2.Cluster == targetScan {
logger.L().Success("Done scanning", helpers.String(targetScan.String(), cautils.ClusterName))
} else {
logger.L().Success("Done scanning " + targetScan.String())
}
}
func (opap *OPAProcessor) processControl(control *reporthandling.Control) (map[string]resourcesresults.ResourceAssociatedControl, error) {
var errs error

View File

@@ -74,6 +74,12 @@ func (report *ReportEventReceiver) SetClusterName(clusterName string) {
}
func (report *ReportEventReceiver) prepareReport(opaSessionObj *cautils.OPASessionObj) error {
// All scans whose target is not a cluster, currently their target is a file, which is what the backend expects
// (e.g. local-git, directory, etc)
if opaSessionObj.Metadata.ScanMetadata.ScanningTarget != reporthandlingv2.Cluster {
opaSessionObj.Metadata.ScanMetadata.ScanningTarget = reporthandlingv2.File
}
report.initEventReceiverURL()
host := hostToString(report.eventReceiverURL, report.reportID)

2
go.mod
View File

@@ -6,7 +6,7 @@ require (
github.com/armosec/armoapi-go v0.0.97
github.com/armosec/go-git-url v0.0.13
github.com/armosec/k8s-interface v0.0.78
github.com/armosec/opa-utils v0.0.159
github.com/armosec/opa-utils v0.0.160
github.com/armosec/rbac-utils v0.0.14
github.com/armosec/utils-go v0.0.7
github.com/armosec/utils-k8s-go v0.0.7

4
go.sum
View File

@@ -193,8 +193,8 @@ github.com/armosec/go-git-url v0.0.13 h1:kwzHBL7oFqf2UsR0h4Sx+HMYqO9xHHuBFXGIyuM
github.com/armosec/go-git-url v0.0.13/go.mod h1:GzfssG3IW9KiURSpK7c/bySBRTlghpObQ7NQ1O4hcMI=
github.com/armosec/k8s-interface v0.0.78 h1:zqIzbFQqSxVMjcDsB4+lgGNgkHVqDZ369/WTGGdLKCE=
github.com/armosec/k8s-interface v0.0.78/go.mod h1:8NX4xWXh8mwW7QyZdZea1czNdM2azCK9BbUNmiZYXW0=
github.com/armosec/opa-utils v0.0.159 h1:GREyXsM8v1tdIY6FDYjM7roXozt191JTjwpfrjkDJIU=
github.com/armosec/opa-utils v0.0.159/go.mod h1:ce7GrjUmp4A2bkw6ItJxnE1GeDeVQxwb3HiwKBbEZTA=
github.com/armosec/opa-utils v0.0.160 h1:KgYN57Fh4t0+EELE/napAB+ysX1X3kDg28Sh34hxt1Y=
github.com/armosec/opa-utils v0.0.160/go.mod h1:ce7GrjUmp4A2bkw6ItJxnE1GeDeVQxwb3HiwKBbEZTA=
github.com/armosec/rbac-utils v0.0.14 h1:CKYKcgqJEXWF2Hen/B1pVGtS3nDAG1wp9dDv6oNtq90=
github.com/armosec/rbac-utils v0.0.14/go.mod h1:Ex/IdGWhGv9HZq6Hs8N/ApzCKSIvpNe/ETqDfnuyah0=
github.com/armosec/utils-go v0.0.7 h1:YGyIzfo7JKocx0SVrKEpQLSKLNLgm+JOEeH0XkO23yQ=

View File

@@ -6,7 +6,7 @@ replace github.com/armosec/kubescape/v2 => ../
require (
github.com/armosec/kubescape/v2 v2.0.0-00010101000000-000000000000
github.com/armosec/opa-utils v0.0.159
github.com/armosec/opa-utils v0.0.160
github.com/armosec/utils-go v0.0.7
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0

View File

@@ -193,8 +193,8 @@ github.com/armosec/go-git-url v0.0.13 h1:kwzHBL7oFqf2UsR0h4Sx+HMYqO9xHHuBFXGIyuM
github.com/armosec/go-git-url v0.0.13/go.mod h1:GzfssG3IW9KiURSpK7c/bySBRTlghpObQ7NQ1O4hcMI=
github.com/armosec/k8s-interface v0.0.78 h1:zqIzbFQqSxVMjcDsB4+lgGNgkHVqDZ369/WTGGdLKCE=
github.com/armosec/k8s-interface v0.0.78/go.mod h1:8NX4xWXh8mwW7QyZdZea1czNdM2azCK9BbUNmiZYXW0=
github.com/armosec/opa-utils v0.0.159 h1:GREyXsM8v1tdIY6FDYjM7roXozt191JTjwpfrjkDJIU=
github.com/armosec/opa-utils v0.0.159/go.mod h1:ce7GrjUmp4A2bkw6ItJxnE1GeDeVQxwb3HiwKBbEZTA=
github.com/armosec/opa-utils v0.0.160 h1:KgYN57Fh4t0+EELE/napAB+ysX1X3kDg28Sh34hxt1Y=
github.com/armosec/opa-utils v0.0.160/go.mod h1:ce7GrjUmp4A2bkw6ItJxnE1GeDeVQxwb3HiwKBbEZTA=
github.com/armosec/rbac-utils v0.0.14 h1:CKYKcgqJEXWF2Hen/B1pVGtS3nDAG1wp9dDv6oNtq90=
github.com/armosec/rbac-utils v0.0.14/go.mod h1:Ex/IdGWhGv9HZq6Hs8N/ApzCKSIvpNe/ETqDfnuyah0=
github.com/armosec/utils-go v0.0.7 h1:YGyIzfo7JKocx0SVrKEpQLSKLNLgm+JOEeH0XkO23yQ=