adding thread lock (#974)

Signed-off-by: Paige Patton <prubenda@redhat.com>
This commit is contained in:
Paige Patton
2025-11-26 09:37:19 -05:00
committed by GitHub
parent 4527d073c6
commit 4ebfc5dde5
3 changed files with 29 additions and 16 deletions

View File

@@ -15,7 +15,7 @@ def invoke(command, timeout=None):
# Invokes a given command and returns the stdout
def invoke_no_exit(command, timeout=None):
def invoke_no_exit(command, timeout=15):
output = ""
try:
output = subprocess.check_output(command, shell=True, universal_newlines=True, timeout=timeout, stderr=subprocess.DEVNULL)

View File

@@ -20,6 +20,7 @@ class VirtChecker:
self.namespace = get_yaml_item_value(kubevirt_check_config, "namespace", "")
self.vm_list = []
self.threads = []
self.iteration_lock = threading.Lock() # Lock to protect current_iterations
self.threads_limit = threads_limit
# setting to 0 in case no variables are set, so no threads later get made
self.batch_size = 0
@@ -57,7 +58,6 @@ class VirtChecker:
elif len(node_name_list) == 0:
# If node_name_list is blank, add all vms
self.vm_list.append(VirtCheck({'vm_name':vmi_name, 'ip_address': ip_address, 'namespace':namespace, 'node_name':node_name, "new_ip_address":""}))
self.batch_size = math.ceil(len(self.vm_list)/self.threads_limit)
def check_disconnected_access(self, ip_address: str, worker_name:str = '', vmi_name: str = ''):
@@ -121,8 +121,7 @@ class VirtChecker:
for thread in self.threads:
thread.join()
def batch_list(self, queue: queue.Queue = None):
logging.info("batch size" + str(self.batch_size))
def batch_list(self, queue: queue.SimpleQueue = None):
if self.batch_size > 0:
# Provided prints to easily visualize how the threads are processed.
for i in range (0, len(self.vm_list),self.batch_size):
@@ -135,13 +134,23 @@ class VirtChecker:
self.threads.append(t)
t.start()
def run_virt_check(self, vm_list_batch, virt_check_telemetry_queue: queue.Queue):
def increment_iterations(self):
"""Thread-safe method to increment current_iterations"""
with self.iteration_lock:
self.current_iterations += 1
def run_virt_check(self, vm_list_batch, virt_check_telemetry_queue: queue.SimpleQueue):
virt_check_telemetry = []
virt_check_tracker = {}
while self.current_iterations < self.iterations:
while True:
# Thread-safe read of current_iterations
with self.iteration_lock:
current = self.current_iterations
if current >= self.iterations:
break
for vm in vm_list_batch:
start_time= datetime.now()
try:
if not self.disconnected:
vm_status = self.get_vm_access(vm.vm_name, vm.namespace)
@@ -157,8 +166,9 @@ class VirtChecker:
if new_node_name and vm.node_name != new_node_name:
vm.node_name = new_node_name
except Exception:
logging.info('Exception in get vm status')
vm_status = False
if vm.vm_name not in virt_check_tracker:
start_timestamp = datetime.now()
virt_check_tracker[vm.vm_name] = {
@@ -171,6 +181,7 @@ class VirtChecker:
"new_ip_address": vm.new_ip_address
}
else:
if vm_status != virt_check_tracker[vm.vm_name]["status"]:
end_timestamp = datetime.now()
start_timestamp = virt_check_tracker[vm.vm_name]["start_timestamp"]
@@ -199,9 +210,11 @@ class VirtChecker:
virt_check_telemetry.append(VirtCheck(virt_check_tracker[vm]))
else:
virt_check_telemetry.append(VirtCheck(virt_check_tracker[vm]))
virt_check_telemetry_queue.put(virt_check_telemetry)
def run_post_virt_check(self, vm_list_batch, virt_check_telemetry, post_virt_check_queue: queue.Queue):
try:
virt_check_telemetry_queue.put(virt_check_telemetry)
except Exception as e:
logging.error('Put queue error ' + str(e))
def run_post_virt_check(self, vm_list_batch, virt_check_telemetry, post_virt_check_queue: queue.SimpleQueue):
virt_check_telemetry = []
virt_check_tracker = {}
@@ -240,7 +253,7 @@ class VirtChecker:
def gather_post_virt_checks(self, kubevirt_check_telem):
post_kubevirt_check_queue = queue.Queue()
post_kubevirt_check_queue = queue.SimpleQueue()
post_threads = []
if self.batch_size > 0:

View File

@@ -326,7 +326,7 @@ def main(options, command: Optional[str]) -> int:
args=(health_check_config, health_check_telemetry_queue))
health_check_worker.start()
kubevirt_check_telemetry_queue = queue.Queue()
kubevirt_check_telemetry_queue = queue.SimpleQueue()
kubevirt_checker = VirtChecker(kubevirt_check_config, iterations=iterations, krkn_lib=kubecli)
kubevirt_checker.batch_list(kubevirt_check_telemetry_queue)
@@ -393,8 +393,7 @@ def main(options, command: Optional[str]) -> int:
iteration += 1
health_checker.current_iterations += 1
kubevirt_checker.current_iterations += 1
kubevirt_checker.increment_iterations()
# telemetry
# in order to print decoded telemetry data even if telemetry collection
# is disabled, it's necessary to serialize the ChaosRunTelemetry object
@@ -411,6 +410,7 @@ def main(options, command: Optional[str]) -> int:
while not kubevirt_check_telemetry_queue.empty():
kubevirt_check_telem.extend(kubevirt_check_telemetry_queue.get_nowait())
chaos_telemetry.virt_checks = kubevirt_check_telem
post_kubevirt_check = kubevirt_checker.gather_post_virt_checks(kubevirt_check_telem)
chaos_telemetry.post_virt_checks = post_kubevirt_check
# if platform is openshift will be collected
@@ -732,4 +732,4 @@ if __name__ == "__main__":
with open(junit_testcase_file_path, "w") as stream:
stream.write(junit_testcase_xml)
sys.exit(retval)
sys.exit(retval)