update status busy to support more than one req

This commit is contained in:
David Wertenteil
2022-05-19 12:01:28 +03:00
parent fc9b713851
commit d08fdf2e9e
3 changed files with 64 additions and 76 deletions

View File

@@ -15,24 +15,23 @@ import (
// Metrics http listener for prometheus support // Metrics http listener for prometheus support
func (handler *HTTPHandler) Metrics(w http.ResponseWriter, r *http.Request) { func (handler *HTTPHandler) Metrics(w http.ResponseWriter, r *http.Request) {
if handler.state.isBusy() { // if already scanning the cluster if handler.state.len() > 0 { // if already scanning the cluster
message := fmt.Sprintf("scan '%s' in action", handler.state.getID()) message := fmt.Sprintf("scan '%s' in action", handler.state.getLatestID())
logger.L().Info("server is busy", helpers.String("message", message), helpers.Time()) logger.L().Info("server is busy", helpers.String("message", message), helpers.Time())
w.WriteHeader(http.StatusServiceUnavailable) w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(message)) w.Write([]byte(message))
return return
} }
handler.state.setBusy()
defer handler.state.setNotBusy()
scanID := uuid.NewString() scanID := uuid.NewString()
handler.state.setID(scanID) handler.state.setBusy(scanID)
defer handler.state.setNotBusy(scanID)
resultsFile := filepath.Join(OutputDir, scanID) resultsFile := filepath.Join(OutputDir, scanID)
// trigger scanning // trigger scanning
logger.L().Info(handler.state.getID(), helpers.String("action", "triggering scan"), helpers.Time()) logger.L().Info(scanID, helpers.String("action", "triggering scan"), helpers.Time())
ks := core.NewKubescape() ks := core.NewKubescape()
results, err := ks.Scan(getPrometheusDefaultScanCommand(scanID, resultsFile)) results, err := ks.Scan(getPrometheusDefaultScanCommand(scanID, resultsFile))
if err != nil { if err != nil {
@@ -41,7 +40,7 @@ func (handler *HTTPHandler) Metrics(w http.ResponseWriter, r *http.Request) {
return return
} }
results.HandleResults() results.HandleResults()
logger.L().Info(handler.state.getID(), helpers.String("action", "done scanning"), helpers.Time()) logger.L().Info(scanID, helpers.String("action", "done scanning"), helpers.Time())
f, err := os.ReadFile(resultsFile) f, err := os.ReadFile(resultsFile)
if err != nil { if err != nil {

View File

@@ -47,7 +47,7 @@ func NewHTTPHandler() *HTTPHandler {
// ============================================== STATUS ======================================================== // ============================================== STATUS ========================================================
// Status API // Status API
func (handler *HTTPHandler) Status(w http.ResponseWriter, r *http.Request) { func (handler *HTTPHandler) Status(w http.ResponseWriter, r *http.Request) {
defer handler.recover(w) defer handler.recover(w, "")
if r.Method != http.MethodGet { if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed) w.WriteHeader(http.StatusMethodNotAllowed)
@@ -59,25 +59,22 @@ func (handler *HTTPHandler) Status(w http.ResponseWriter, r *http.Request) {
statusQueryParams := &StatusQueryParams{} statusQueryParams := &StatusQueryParams{}
if err := schema.NewDecoder().Decode(statusQueryParams, r.URL.Query()); err != nil { if err := schema.NewDecoder().Decode(statusQueryParams, r.URL.Query()); err != nil {
handler.writeError(w, fmt.Errorf("failed to parse query params, reason: %s", err.Error())) handler.writeError(w, fmt.Errorf("failed to parse query params, reason: %s", err.Error()), "")
return return
} }
if !handler.state.isBusy() { if !handler.state.isBusy(statusQueryParams.ScanID) {
response.Type = utilsapisv1.NotBusyScanResponseType response.Type = utilsapisv1.NotBusyScanResponseType
w.Write(responseToBytes(&response)) w.Write(responseToBytes(&response))
return return
} }
currentScanID := handler.state.getID() if statusQueryParams.ScanID == "" {
if statusQueryParams.ScanID != "" && currentScanID != statusQueryParams.ScanID { statusQueryParams.ScanID = handler.state.getLatestID()
response.Type = utilsapisv1.NotBusyScanResponseType
w.Write(responseToBytes(&response))
return
} }
response.Response = currentScanID response.Response = statusQueryParams.ScanID
response.ID = currentScanID response.ID = statusQueryParams.ScanID
response.Type = utilsapisv1.BusyScanResponseType response.Type = utilsapisv1.BusyScanResponseType
w.Write(responseToBytes(&response)) w.Write(responseToBytes(&response))
} }
@@ -86,7 +83,10 @@ func (handler *HTTPHandler) Status(w http.ResponseWriter, r *http.Request) {
// Scan API - TODO: break down to functions // Scan API - TODO: break down to functions
func (handler *HTTPHandler) Scan(w http.ResponseWriter, r *http.Request) { func (handler *HTTPHandler) Scan(w http.ResponseWriter, r *http.Request) {
defer handler.recover(w) // generate id
scanID := uuid.NewString()
defer handler.recover(w, scanID)
defer r.Body.Close() defer r.Body.Close()
@@ -99,30 +99,20 @@ func (handler *HTTPHandler) Scan(w http.ResponseWriter, r *http.Request) {
scanQueryParams := &ScanQueryParams{} scanQueryParams := &ScanQueryParams{}
if err := schema.NewDecoder().Decode(scanQueryParams, r.URL.Query()); err != nil { if err := schema.NewDecoder().Decode(scanQueryParams, r.URL.Query()); err != nil {
handler.writeError(w, fmt.Errorf("failed to parse query params, reason: %s", err.Error())) handler.writeError(w, fmt.Errorf("failed to parse query params, reason: %s", err.Error()), scanID)
return
}
if handler.state.isBusy() {
// TODO - Add to queue
w.WriteHeader(http.StatusOK)
response.Response = handler.state.getID()
response.ID = handler.state.getID()
response.Type = utilsapisv1.IDScanResponseType
w.Write(responseToBytes(&response))
return return
} }
handler.state.setBusy() handler.state.setBusy(scanID)
// Add to queue
// generate id
scanID := uuid.NewString()
handler.state.setID(scanID)
response.ID = scanID response.ID = scanID
response.Type = utilsapisv1.IDScanResponseType response.Type = utilsapisv1.IDScanResponseType
readBuffer, err := ioutil.ReadAll(r.Body) readBuffer, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
handler.writeError(w, fmt.Errorf("failed to read request body, reason: %s", err.Error())) handler.writeError(w, fmt.Errorf("failed to read request body, reason: %s", err.Error()), scanID)
return return
} }
@@ -130,7 +120,7 @@ func (handler *HTTPHandler) Scan(w http.ResponseWriter, r *http.Request) {
scanRequest := utilsmetav1.PostScanRequest{} scanRequest := utilsmetav1.PostScanRequest{}
if err := json.Unmarshal(readBuffer, &scanRequest); err != nil { if err := json.Unmarshal(readBuffer, &scanRequest); err != nil {
handler.writeError(w, fmt.Errorf("failed to parse request payload, reason: %s", err.Error())) handler.writeError(w, fmt.Errorf("failed to parse request payload, reason: %s", err.Error()), scanID)
return return
} }
@@ -166,7 +156,7 @@ func (handler *HTTPHandler) Scan(w http.ResponseWriter, r *http.Request) {
logger.L().Debug("deleting results", helpers.String("ID", scanID)) logger.L().Debug("deleting results", helpers.String("ID", scanID))
removeResultsFile(scanID) removeResultsFile(scanID)
} }
handler.state.setNotBusy() handler.state.setNotBusy(scanID)
}() }()
wg.Wait() wg.Wait()
@@ -174,6 +164,11 @@ func (handler *HTTPHandler) Scan(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(statusCode) w.WriteHeader(statusCode)
w.Write(responseToBytes(&response)) w.Write(responseToBytes(&response))
} }
func (handler *HTTPHandler) scan() {
for {
}
}
// ============================================== RESULTS ======================================================== // ============================================== RESULTS ========================================================
@@ -182,13 +177,13 @@ func (handler *HTTPHandler) Results(w http.ResponseWriter, r *http.Request) {
response := utilsmetav1.Response{} response := utilsmetav1.Response{}
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
defer handler.recover(w) defer handler.recover(w, "")
defer r.Body.Close() defer r.Body.Close()
resultsQueryParams := &ResultsQueryParams{} resultsQueryParams := &ResultsQueryParams{}
if err := schema.NewDecoder().Decode(resultsQueryParams, r.URL.Query()); err != nil { if err := schema.NewDecoder().Decode(resultsQueryParams, r.URL.Query()); err != nil {
handler.writeError(w, fmt.Errorf("failed to parse query params, reason: %s", err.Error())) handler.writeError(w, fmt.Errorf("failed to parse query params, reason: %s", err.Error()), "")
return return
} }
@@ -206,14 +201,13 @@ func (handler *HTTPHandler) Results(w http.ResponseWriter, r *http.Request) {
} }
response.ID = resultsQueryParams.ScanID response.ID = resultsQueryParams.ScanID
if handler.state.isBusy() { // if requested ID is still scanning if handler.state.isBusy(resultsQueryParams.ScanID) { // if requested ID is still scanning
if resultsQueryParams.ScanID == handler.state.getID() { logger.L().Info("scan in process", helpers.String("ID", resultsQueryParams.ScanID))
logger.L().Info("scan in process", helpers.String("ID", resultsQueryParams.ScanID)) w.WriteHeader(http.StatusOK)
w.WriteHeader(http.StatusOK) response.Response = fmt.Sprintf("scanning '%s' in progress", resultsQueryParams.ScanID)
response.Response = "scanning in progress" w.Write(responseToBytes(&response))
w.Write(responseToBytes(&response)) return
return
}
} }
switch r.Method { switch r.Method {
@@ -264,10 +258,10 @@ func responseToBytes(res *utilsmetav1.Response) []byte {
return b return b
} }
func (handler *HTTPHandler) recover(w http.ResponseWriter) { func (handler *HTTPHandler) recover(w http.ResponseWriter, scanID string) {
response := utilsmetav1.Response{} response := utilsmetav1.Response{}
if err := recover(); err != nil { if err := recover(); err != nil {
handler.state.setNotBusy() handler.state.setNotBusy(scanID)
logger.L().Error("recover", helpers.Error(fmt.Errorf("%v", err))) logger.L().Error("recover", helpers.Error(fmt.Errorf("%v", err)))
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
response.Response = fmt.Sprintf("%v", err) response.Response = fmt.Sprintf("%v", err)
@@ -276,11 +270,11 @@ func (handler *HTTPHandler) recover(w http.ResponseWriter) {
} }
} }
func (handler *HTTPHandler) writeError(w http.ResponseWriter, err error) { func (handler *HTTPHandler) writeError(w http.ResponseWriter, err error, scanID string) {
response := utilsmetav1.Response{} response := utilsmetav1.Response{}
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
response.Response = err.Error() response.Response = err.Error()
response.Type = utilsapisv1.ErrorScanResponseType response.Type = utilsapisv1.ErrorScanResponseType
w.Write(responseToBytes(&response)) w.Write(responseToBytes(&response))
handler.state.setNotBusy() handler.state.setNotBusy(scanID)
} }

View File

@@ -3,44 +3,32 @@ package v1
import "sync" import "sync"
type serverState struct { type serverState struct {
// response string statusID map[string]bool
busy bool
id string
latestID string latestID string
mtx sync.RWMutex mtx sync.RWMutex
} }
func (s *serverState) isBusy() bool { // isBusy is server busy with ID, if id is empty will check for latest ID
func (s *serverState) isBusy(id string) bool {
s.mtx.RLock() s.mtx.RLock()
busy := s.busy if id == "" {
id = s.latestID
}
busy := s.statusID[id]
s.mtx.RUnlock() s.mtx.RUnlock()
return busy return busy
} }
func (s *serverState) setBusy() { func (s *serverState) setBusy(id string) {
s.mtx.Lock() s.mtx.Lock()
s.busy = true s.statusID[id] = true
s.latestID = id
s.mtx.Unlock() s.mtx.Unlock()
} }
func (s *serverState) setNotBusy() { func (s *serverState) setNotBusy(id string) {
s.mtx.Lock() s.mtx.Lock()
s.busy = false delete(s.statusID, id)
s.latestID = s.id
s.id = ""
s.mtx.Unlock()
}
func (s *serverState) getID() string {
s.mtx.RLock()
id := s.id
s.mtx.RUnlock()
return id
}
func (s *serverState) setID(id string) {
s.mtx.Lock()
s.id = id
s.mtx.Unlock() s.mtx.Unlock()
} }
@@ -51,9 +39,16 @@ func (s *serverState) getLatestID() string {
return id return id
} }
func (s *serverState) len() int {
s.mtx.RLock()
l := len(s.statusID)
s.mtx.RUnlock()
return l
}
func newServerState() *serverState { func newServerState() *serverState {
return &serverState{ return &serverState{
busy: false, statusID: make(map[string]bool),
mtx: sync.RWMutex{}, mtx: sync.RWMutex{},
} }
} }