mirror of
https://github.com/aquasecurity/kube-hunter.git
synced 2026-05-23 17:44:02 +00:00
found another variable I think should be locked
This commit is contained in:
@@ -10,9 +10,10 @@ from __main__ import config
|
||||
from ..types import ActiveHunter, Hunter
|
||||
|
||||
from ...core.events.types import HuntFinished
|
||||
import threading
|
||||
|
||||
working_count = 0
|
||||
global is_running_lock
|
||||
is_running_lock = Lock()
|
||||
|
||||
|
||||
# Inherits Queue object, handles events asynchronously
|
||||
class EventQueue(Queue, object):
|
||||
@@ -22,7 +23,9 @@ class EventQueue(Queue, object):
|
||||
self.active_hunters = dict()
|
||||
|
||||
self.hooks = defaultdict(list)
|
||||
is_running_lock.acquire()
|
||||
self.running = True
|
||||
is_running_lock.release()
|
||||
self.workers = list()
|
||||
|
||||
for i in range(num_worker):
|
||||
@@ -34,14 +37,14 @@ class EventQueue(Queue, object):
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
|
||||
# decorator wrapping for easy subscription
|
||||
def subscribe(self, event, hook=None, predicate=None):
|
||||
def wrapper(hook):
|
||||
self.subscribe_event(event, hook=hook, predicate=predicate)
|
||||
return hook
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
# getting uninstantiated event object
|
||||
def subscribe_event(self, event, hook=None, predicate=None):
|
||||
if ActiveHunter in hook.__mro__:
|
||||
@@ -71,6 +74,7 @@ class EventQueue(Queue, object):
|
||||
|
||||
# executes callbacks on dedicated thread as a daemon
|
||||
def worker(self):
|
||||
is_running_lock.acquire()
|
||||
while self.running:
|
||||
hook = self.get()
|
||||
try:
|
||||
@@ -78,17 +82,20 @@ class EventQueue(Queue, object):
|
||||
except Exception as ex:
|
||||
logging.debug(ex.message)
|
||||
self.task_done()
|
||||
is_running_lock.release()
|
||||
logging.debug("closing thread...")
|
||||
|
||||
def notifier(self):
|
||||
time.sleep(2)
|
||||
time.sleep(2)
|
||||
while self.unfinished_tasks > 0:
|
||||
logging.debug("{} tasks left".format(self.unfinished_tasks))
|
||||
time.sleep(3)
|
||||
|
||||
# stops execution of all daemons
|
||||
def free(self):
|
||||
is_running_lock.acquire()
|
||||
self.running = False
|
||||
is_running_lock.release()
|
||||
with self.mutex:
|
||||
self.queue.clear()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user