diff --git a/.coveragerc b/.coveragerc index b0bb9b7e..af01e9dc 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,4 +1,4 @@ [run] omit = tests/* - krkn/tests/** \ No newline at end of file + krkn/tests/** diff --git a/tests/test_ingress_network_plugin.py b/tests/ingress_network/test_ingress_network_plugin.py similarity index 100% rename from tests/test_ingress_network_plugin.py rename to tests/ingress_network/test_ingress_network_plugin.py diff --git a/tests/run_python_plugin.py b/tests/run_python_plugin.py new file mode 100644 index 00000000..a29c01a9 --- /dev/null +++ b/tests/run_python_plugin.py @@ -0,0 +1,37 @@ +import tempfile +import unittest + +from krkn.scenario_plugins.native.run_python_plugin import ( + RunPythonFileInput, + run_python_file, +) + + +class RunPythonPluginTest(unittest.TestCase): + def test_success_execution(self): + tmp_file = tempfile.NamedTemporaryFile() + tmp_file.write(bytes("print('Hello world!')", "utf-8")) + tmp_file.flush() + output_id, output_data = run_python_file( + params=RunPythonFileInput(tmp_file.name), + run_id="test-python-plugin-success", + ) + self.assertEqual("success", output_id) + self.assertEqual("Hello world!\n", output_data.stdout) + + def test_error_execution(self): + tmp_file = tempfile.NamedTemporaryFile() + tmp_file.write( + bytes("import sys\nprint('Hello world!')\nsys.exit(42)\n", "utf-8") + ) + tmp_file.flush() + output_id, output_data = run_python_file( + params=RunPythonFileInput(tmp_file.name), run_id="test-python-plugin-error" + ) + self.assertEqual("error", output_id) + self.assertEqual(42, output_data.exit_code) + self.assertEqual("Hello world!\n", output_data.stdout) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_application_outage_scenario_plugin.py b/tests/test_application_outage_scenario_plugin.py new file mode 100644 index 00000000..9137d65f --- /dev/null +++ b/tests/test_application_outage_scenario_plugin.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +Test suite for ApplicationOutageScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_application_outage_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.application_outage.application_outage_scenario_plugin import ApplicationOutageScenarioPlugin + + +class TestApplicationOutageScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for ApplicationOutageScenarioPlugin + """ + self.plugin = ApplicationOutageScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["application_outages_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_container_scenario_plugin.py b/tests/test_container_scenario_plugin.py new file mode 100644 index 00000000..8ca40365 --- /dev/null +++ b/tests/test_container_scenario_plugin.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +Test suite for ContainerScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_container_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.container.container_scenario_plugin import ContainerScenarioPlugin + + +class TestContainerScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for ContainerScenarioPlugin + """ + self.plugin = ContainerScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["container_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_health_checker.py b/tests/test_health_checker.py new file mode 100644 index 00000000..295950e1 --- /dev/null +++ b/tests/test_health_checker.py @@ -0,0 +1,503 @@ +#!/usr/bin/env python3 + +""" +Test suite for HealthChecker class + +This test file provides comprehensive coverage for the main functionality of HealthChecker: +- HTTP request making with various authentication methods +- Health check monitoring with status tracking +- Failure detection and recovery tracking +- Exit on failure behavior +- Telemetry collection + +Usage: + python -m coverage run -a -m unittest tests/test_health_checker.py -v + +Assisted By: Claude Code +""" + +import queue +import unittest +from datetime import datetime +from unittest.mock import MagicMock, patch + +from krkn_lib.models.telemetry.models import HealthCheck + +from krkn.utils.HealthChecker import HealthChecker + + +class TestHealthChecker(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for HealthChecker + """ + self.checker = HealthChecker(iterations=5) + self.health_check_queue = queue.Queue() + + def tearDown(self): + """ + Clean up after each test + """ + self.checker.current_iterations = 0 + self.checker.ret_value = 0 + + def make_increment_side_effect(self, response_data): + """ + Helper to create a side effect that increments current_iterations + """ + def side_effect(*args, **kwargs): + self.checker.current_iterations += 1 + return response_data + return side_effect + + @patch('requests.get') + def test_make_request_success(self, mock_get): + """ + Test make_request returns success for 200 status code + """ + mock_response = MagicMock() + mock_response.status_code = 200 + mock_get.return_value = mock_response + + result = self.checker.make_request("http://example.com") + + self.assertEqual(result["url"], "http://example.com") + self.assertEqual(result["status"], True) + self.assertEqual(result["status_code"], 200) + mock_get.assert_called_once_with( + "http://example.com", + auth=None, + headers=None, + verify=True, + timeout=3 + ) + + @patch('requests.get') + def test_make_request_with_auth(self, mock_get): + """ + Test make_request with basic authentication + """ + mock_response = MagicMock() + mock_response.status_code = 200 + mock_get.return_value = mock_response + + auth = ("user", "pass") + result = self.checker.make_request("http://example.com", auth=auth) + + self.assertEqual(result["status"], True) + mock_get.assert_called_once_with( + "http://example.com", + auth=auth, + headers=None, + verify=True, + timeout=3 + ) + + @patch('requests.get') + def test_make_request_with_bearer_token(self, mock_get): + """ + Test make_request with bearer token authentication + """ + mock_response = MagicMock() + mock_response.status_code = 200 + mock_get.return_value = mock_response + + headers = {"Authorization": "Bearer token123"} + result = self.checker.make_request("http://example.com", headers=headers) + + self.assertEqual(result["status"], True) + mock_get.assert_called_once_with( + "http://example.com", + auth=None, + headers=headers, + verify=True, + timeout=3 + ) + + @patch('requests.get') + def test_make_request_failure(self, mock_get): + """ + Test make_request returns failure for non-200 status code + """ + mock_response = MagicMock() + mock_response.status_code = 500 + mock_get.return_value = mock_response + + result = self.checker.make_request("http://example.com") + + self.assertEqual(result["status"], False) + self.assertEqual(result["status_code"], 500) + + @patch('requests.get') + def test_make_request_with_verify_false(self, mock_get): + """ + Test make_request with SSL verification disabled + """ + mock_response = MagicMock() + mock_response.status_code = 200 + mock_get.return_value = mock_response + + result = self.checker.make_request("https://example.com", verify=False) + + self.assertEqual(result["status"], True) + mock_get.assert_called_once_with( + "https://example.com", + auth=None, + headers=None, + verify=False, + timeout=3 + ) + + @patch('krkn.utils.HealthChecker.HealthChecker.make_request') + @patch('time.sleep') + def test_run_health_check_empty_config(self, mock_sleep, mock_make_request): + """ + Test run_health_check with empty config skips checks + """ + config = { + "config": [], + "interval": 2 + } + + self.checker.run_health_check(config, self.health_check_queue) + + mock_make_request.assert_not_called() + self.assertTrue(self.health_check_queue.empty()) + + @patch('krkn.utils.HealthChecker.HealthChecker.make_request') + @patch('time.sleep') + def test_run_health_check_successful_requests(self, mock_sleep, mock_make_request): + """ + Test run_health_check with all successful requests + """ + mock_make_request.side_effect = self.make_increment_side_effect({ + "url": "http://example.com", + "status": True, + "status_code": 200 + }) + + config = { + "config": [ + { + "url": "http://example.com", + "bearer_token": None, + "auth": None, + "exit_on_failure": False + } + ], + "interval": 0.01 + } + + self.checker.iterations = 2 + self.checker.run_health_check(config, self.health_check_queue) + + # Should have telemetry + self.assertFalse(self.health_check_queue.empty()) + telemetry = self.health_check_queue.get() + self.assertEqual(len(telemetry), 1) + self.assertEqual(telemetry[0].status, True) + + @patch('krkn.utils.HealthChecker.HealthChecker.make_request') + @patch('time.sleep') + def test_run_health_check_failure_then_recovery(self, mock_sleep, mock_make_request): + """ + Test run_health_check detects failure and recovery + """ + # Create side effects that increment and return different values + call_count = [0] + def side_effect(*args, **kwargs): + self.checker.current_iterations += 1 + call_count[0] += 1 + if call_count[0] == 1: + return {"url": "http://example.com", "status": False, "status_code": 500} + else: + return {"url": "http://example.com", "status": True, "status_code": 200} + + mock_make_request.side_effect = side_effect + + config = { + "config": [ + { + "url": "http://example.com", + "bearer_token": None, + "auth": None, + "exit_on_failure": False + } + ], + "interval": 0.01 + } + + self.checker.iterations = 3 + self.checker.run_health_check(config, self.health_check_queue) + + # Should have telemetry showing failure period + self.assertFalse(self.health_check_queue.empty()) + telemetry = self.health_check_queue.get() + + # Should have at least 2 entries: one for failure period, one for success period + self.assertGreaterEqual(len(telemetry), 1) + + @patch('krkn.utils.HealthChecker.HealthChecker.make_request') + @patch('time.sleep') + def test_run_health_check_with_bearer_token(self, mock_sleep, mock_make_request): + """ + Test run_health_check correctly handles bearer token + """ + mock_make_request.side_effect = self.make_increment_side_effect({ + "url": "http://example.com", + "status": True, + "status_code": 200 + }) + + config = { + "config": [ + { + "url": "http://example.com", + "bearer_token": "test-token-123", + "auth": None, + "exit_on_failure": False + } + ], + "interval": 0.01 + } + + self.checker.iterations = 1 + self.checker.run_health_check(config, self.health_check_queue) + + # Verify bearer token was added to headers + # make_request is called as: make_request(url, auth, headers, verify_url) + call_args = mock_make_request.call_args + self.assertEqual(call_args[0][2]['Authorization'], "Bearer test-token-123") + + @patch('krkn.utils.HealthChecker.HealthChecker.make_request') + @patch('time.sleep') + def test_run_health_check_with_auth(self, mock_sleep, mock_make_request): + """ + Test run_health_check correctly handles basic auth + """ + mock_make_request.side_effect = self.make_increment_side_effect({ + "url": "http://example.com", + "status": True, + "status_code": 200 + }) + + config = { + "config": [ + { + "url": "http://example.com", + "bearer_token": None, + "auth": "user,pass", + "exit_on_failure": False + } + ], + "interval": 0.01 + } + + self.checker.iterations = 1 + self.checker.run_health_check(config, self.health_check_queue) + + # Verify auth tuple was created correctly + # make_request is called as: make_request(url, auth, headers, verify_url) + call_args = mock_make_request.call_args + self.assertEqual(call_args[0][1], ("user", "pass")) + + @patch('krkn.utils.HealthChecker.HealthChecker.make_request') + @patch('time.sleep') + def test_run_health_check_exit_on_failure(self, mock_sleep, mock_make_request): + """ + Test run_health_check sets ret_value=2 when exit_on_failure is True + """ + mock_make_request.side_effect = self.make_increment_side_effect({ + "url": "http://example.com", + "status": False, + "status_code": 500 + }) + + config = { + "config": [ + { + "url": "http://example.com", + "bearer_token": None, + "auth": None, + "exit_on_failure": True + } + ], + "interval": 0.01 + } + + self.checker.iterations = 1 + self.checker.run_health_check(config, self.health_check_queue) + + # ret_value should be set to 2 on failure + self.assertEqual(self.checker.ret_value, 2) + + @patch('krkn.utils.HealthChecker.HealthChecker.make_request') + @patch('time.sleep') + def test_run_health_check_exit_on_failure_not_set_on_success(self, mock_sleep, mock_make_request): + """ + Test run_health_check does not set ret_value when request succeeds + """ + mock_make_request.side_effect = self.make_increment_side_effect({ + "url": "http://example.com", + "status": True, + "status_code": 200 + }) + + config = { + "config": [ + { + "url": "http://example.com", + "bearer_token": None, + "auth": None, + "exit_on_failure": True + } + ], + "interval": 0.01 + } + + self.checker.iterations = 1 + self.checker.run_health_check(config, self.health_check_queue) + + # ret_value should remain 0 on success + self.assertEqual(self.checker.ret_value, 0) + + @patch('krkn.utils.HealthChecker.HealthChecker.make_request') + @patch('time.sleep') + def test_run_health_check_with_verify_url_false(self, mock_sleep, mock_make_request): + """ + Test run_health_check respects verify_url setting + """ + mock_make_request.side_effect = self.make_increment_side_effect({ + "url": "https://example.com", + "status": True, + "status_code": 200 + }) + + config = { + "config": [ + { + "url": "https://example.com", + "bearer_token": None, + "auth": None, + "exit_on_failure": False, + "verify_url": False + } + ], + "interval": 0.01 + } + + self.checker.iterations = 1 + self.checker.run_health_check(config, self.health_check_queue) + + # Verify that verify parameter was set to False + # make_request is called as: make_request(url, auth, headers, verify_url) + call_args = mock_make_request.call_args + self.assertEqual(call_args[0][3], False) + + @patch('krkn.utils.HealthChecker.HealthChecker.make_request') + @patch('time.sleep') + def test_run_health_check_exception_handling(self, mock_sleep, mock_make_request): + """ + Test run_health_check handles exceptions during requests + """ + # Simulate exception during request but also increment to avoid infinite loop + def side_effect(*args, **kwargs): + self.checker.current_iterations += 1 + raise Exception("Connection error") + + mock_make_request.side_effect = side_effect + + config = { + "config": [ + { + "url": "http://example.com", + "bearer_token": None, + "auth": None, + "exit_on_failure": False + } + ], + "interval": 0.01 + } + + self.checker.iterations = 1 + + # Should not raise exception + self.checker.run_health_check(config, self.health_check_queue) + + @patch('krkn.utils.HealthChecker.HealthChecker.make_request') + @patch('time.sleep') + def test_run_health_check_multiple_urls(self, mock_sleep, mock_make_request): + """ + Test run_health_check with multiple URLs + """ + call_count = [0] + def side_effect(*args, **kwargs): + call_count[0] += 1 + # Increment only after both URLs are called (one iteration) + if call_count[0] % 2 == 0: + self.checker.current_iterations += 1 + return { + "status": True, + "status_code": 200 + } + + mock_make_request.side_effect = side_effect + + config = { + "config": [ + { + "url": "http://example1.com", + "bearer_token": None, + "auth": None, + "exit_on_failure": False + }, + { + "url": "http://example2.com", + "bearer_token": None, + "auth": None, + "exit_on_failure": False + } + ], + "interval": 0.01 + } + + self.checker.iterations = 1 + self.checker.run_health_check(config, self.health_check_queue) + + # Should have called make_request for both URLs + self.assertEqual(mock_make_request.call_count, 2) + + @patch('krkn.utils.HealthChecker.HealthChecker.make_request') + @patch('time.sleep') + def test_run_health_check_custom_interval(self, mock_sleep, mock_make_request): + """ + Test run_health_check uses custom interval + """ + mock_make_request.side_effect = self.make_increment_side_effect({ + "url": "http://example.com", + "status": True, + "status_code": 200 + }) + + config = { + "config": [ + { + "url": "http://example.com", + "bearer_token": None, + "auth": None, + "exit_on_failure": False + } + ], + "interval": 5 + } + + self.checker.iterations = 2 + self.checker.run_health_check(config, self.health_check_queue) + + # Verify sleep was called with custom interval + mock_sleep.assert_called_with(5) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_hogs_scenario_plugin.py b/tests/test_hogs_scenario_plugin.py new file mode 100644 index 00000000..3cd0a0e4 --- /dev/null +++ b/tests/test_hogs_scenario_plugin.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +Test suite for HogsScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_hogs_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.hogs.hogs_scenario_plugin import HogsScenarioPlugin + + +class TestHogsScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for HogsScenarioPlugin + """ + self.plugin = HogsScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["hog_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_kubevirt_vm_outage.py b/tests/test_kubevirt_vm_outage.py new file mode 100644 index 00000000..cded8b51 --- /dev/null +++ b/tests/test_kubevirt_vm_outage.py @@ -0,0 +1,696 @@ +#!/usr/bin/env python3 + +""" +Test suite for KubeVirt VM Outage Scenario Plugin class + +Note: This test file uses mocks extensively to avoid needing actual Kubernetes/KubeVirt infrastructure. + +Usage: + python -m coverage run -a -m unittest tests/test_kubevirt_vm_outage.py -v + +Assisted By: Claude Code +""" + +import copy +import itertools +import os +import tempfile +import unittest +from unittest.mock import MagicMock, patch + +import yaml +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.models.k8s import AffectedPod, PodsStatus +from krkn_lib.models.telemetry import ScenarioTelemetry +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift +from kubernetes.client.rest import ApiException + +from krkn.scenario_plugins.kubevirt_vm_outage.kubevirt_vm_outage_scenario_plugin import KubevirtVmOutageScenarioPlugin + +class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for KubevirtVmOutageScenarioPlugin + """ + self.plugin = KubevirtVmOutageScenarioPlugin() + + # Create mock k8s client + self.k8s_client = MagicMock() + self.custom_object_client = MagicMock() + self.k8s_client.custom_object_client = self.custom_object_client + self.plugin.k8s_client = self.k8s_client + self.plugin.custom_object_client = self.custom_object_client + + # Mock methods needed for KubeVirt operations + self.k8s_client.list_custom_resource_definition = MagicMock() + + # Mock custom resource definition list with KubeVirt CRDs + crd_list = MagicMock() + crd_item = MagicMock() + crd_item.spec = MagicMock() + crd_item.spec.group = "kubevirt.io" + crd_list.items = [crd_item] + self.k8s_client.list_custom_resource_definition.return_value = crd_list + + # Mock VMI data + self.mock_vmi = { + "metadata": { + "name": "test-vm", + "namespace": "default" + }, + "status": { + "phase": "Running" + } + } + + # Create test config + self.config = { + "scenarios": [ + { + "name": "kubevirt outage test", + "scenario": "kubevirt_vm_outage", + "parameters": { + "vm_name": "test-vm", + "namespace": "default", + "duration": 0 + } + } + ] + } + + # Create a temporary config file + temp_dir = tempfile.gettempdir() + self.scenario_file = os.path.join(temp_dir, "test_kubevirt_scenario.yaml") + with open(self.scenario_file, "w") as f: + yaml.dump(self.config, f) + + # Mock dependencies + self.telemetry = MagicMock(spec=KrknTelemetryOpenshift) + self.scenario_telemetry = MagicMock(spec=ScenarioTelemetry) + self.telemetry.get_lib_kubernetes.return_value = self.k8s_client + + # Initialize counters for reusable mock functions + self.delete_count = 0 + self.wait_count = 0 + + def mock_delete(self, *args, **kwargs): + self.delete_count += 1 + self.plugin.affected_pod = AffectedPod(pod_name=f"test-vm-{self.delete_count}", namespace="default") + self.plugin.affected_pod.pod_rescheduling_time = 5.0 + return 0 + + def mock_wait(self, *args, **kwargs): + self.wait_count += 1 + self.plugin.affected_pod.pod_readiness_time = 3.0 + return 0 + + def test_successful_injection_and_recovery(self): + """ + Test successful deletion and recovery of a VMI + """ + # Populate vmis_list to avoid randrange error + self.plugin.vmis_list = [self.mock_vmi] + + # Mock get_vmis to not clear the list + with patch.object(self.plugin, 'get_vmis'): + # Mock get_vmi to return our mock VMI + with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi): + # Mock validate_environment to return True + with patch.object(self.plugin, 'validate_environment', return_value=True): + # Mock delete_vmi and wait_for_running to simulate success + with patch.object(self.plugin, 'delete_vmi', side_effect=self.mock_delete) as mock_delete: + with patch.object(self.plugin, 'wait_for_running', side_effect=self.mock_wait) as mock_wait: + with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))): + result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry) + + self.assertEqual(result, 0) + mock_delete.assert_called_once_with("test-vm", "default", False) + mock_wait.assert_called_once_with("test-vm", "default", 60) + + def test_injection_failure(self): + """ + Test failure during VMI deletion + """ + # Populate vmis_list to avoid randrange error + self.plugin.vmis_list = [self.mock_vmi] + + # Mock get_vmis to not clear the list + with patch.object(self.plugin, 'get_vmis'): + # Mock get_vmi to return our mock VMI + with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi): + # Mock validate_environment to return True + with patch.object(self.plugin, 'validate_environment', return_value=True): + # Mock delete_vmi to simulate failure + with patch.object(self.plugin, 'delete_vmi', return_value=1) as mock_delete: + with patch.object(self.plugin, 'wait_for_running', return_value=0) as mock_wait: + with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))): + result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry) + + self.assertEqual(result, 0) + mock_delete.assert_called_once_with("test-vm", "default", False) + mock_wait.assert_not_called() + + def test_disable_auto_restart(self): + """ + Test VM auto-restart can be disabled + """ + # Configure test with disable_auto_restart=True + self.config["scenarios"][0]["parameters"]["disable_auto_restart"] = True + + # Populate vmis_list to avoid randrange error + self.plugin.vmis_list = [self.mock_vmi] + + # Mock get_vmis to not clear the list + with patch.object(self.plugin, 'get_vmis'): + # Mock get_vmi to return our mock VMI + with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi): + # Mock validate_environment to return True + with patch.object(self.plugin, 'validate_environment', return_value=True): + # Mock delete_vmi and wait_for_running + with patch.object(self.plugin, 'delete_vmi', side_effect=self.mock_delete) as mock_delete: + with patch.object(self.plugin, 'wait_for_running', side_effect=self.mock_wait) as mock_wait: + with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))): + result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry) + + self.assertEqual(result, 0) + # delete_vmi should be called with disable_auto_restart=True + mock_delete.assert_called_once_with("test-vm", "default", True) + mock_wait.assert_called_once_with("test-vm", "default", 60) + + def test_recovery_when_vmi_does_not_exist(self): + """ + Test recovery logic when VMI does not exist after deletion + """ + # Initialize the plugin's custom_object_client + self.plugin.custom_object_client = self.custom_object_client + + # Store the original VMI in the plugin for recovery + self.plugin.original_vmi = self.mock_vmi.copy() + + # Create a cleaned vmi_dict as the plugin would + vmi_dict = self.mock_vmi.copy() + + # Set up running VMI data for after recovery + running_vmi = { + "metadata": {"name": "test-vm", "namespace": "default"}, + "status": {"phase": "Running"} + } + + # Set up time.time to immediately exceed the timeout for auto-recovery + with patch('time.time', side_effect=[0, 301, 301, 301, 301, 310, 320]): + # Mock get_vmi to always return None (not auto-recovered) + with patch.object(self.plugin, 'get_vmi', side_effect=[None, None, running_vmi]): + # Mock the custom object API to return success + self.custom_object_client.create_namespaced_custom_object = MagicMock(return_value=running_vmi) + + # Run recovery with mocked time.sleep + with patch('time.sleep'): + result = self.plugin.recover("test-vm", "default", False) + + self.assertEqual(result, 0) + # Verify create was called with the right arguments for our API version and kind + self.custom_object_client.create_namespaced_custom_object.assert_called_once_with( + group="kubevirt.io", + version="v1", + namespace="default", + plural="virtualmachineinstances", + body=vmi_dict + ) + + def test_validation_failure(self): + """ + Test validation failure when KubeVirt is not installed + """ + # Populate vmis_list to avoid randrange error + self.plugin.vmis_list = [self.mock_vmi] + + # Mock get_vmis to not clear the list + with patch.object(self.plugin, 'get_vmis'): + # Mock get_vmi to return our mock VMI + with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi): + # Mock validate_environment to return False (KubeVirt not installed) + with patch.object(self.plugin, 'validate_environment', return_value=False): + with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))): + result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry) + + # When validation fails, run() returns 1 due to exception handling + self.assertEqual(result, 1) + + def test_delete_vmi_timeout(self): + """ + Test timeout during VMI deletion + """ + # Initialize the plugin's custom_object_client and required attributes + self.plugin.custom_object_client = self.custom_object_client + + # Initialize original_vmi which is required by delete_vmi + self.plugin.original_vmi = self.mock_vmi.copy() + self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z' + + # Initialize pods_status which delete_vmi needs + from krkn_lib.models.k8s import PodsStatus, AffectedPod + self.plugin.pods_status = PodsStatus() + self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default") + + # Mock successful delete operation + self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={}) + + # Mock that get_vmi always returns VMI with same creationTimestamp (never gets recreated) + mock_vmi_with_time = self.mock_vmi.copy() + mock_vmi_with_time['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z' + + with patch.object(self.plugin, 'get_vmi', return_value=mock_vmi_with_time): + # Simulate timeout by making time.time return values that exceed the timeout + with patch('time.sleep'), patch('time.time', side_effect=[0, 10, 20, 130, 130, 130, 130, 140]): + result = self.plugin.delete_vmi("test-vm", "default", False) + + self.assertEqual(result, 1) + self.custom_object_client.delete_namespaced_custom_object.assert_called_once_with( + group="kubevirt.io", + version="v1", + namespace="default", + plural="virtualmachineinstances", + name="test-vm" + ) + + + def test_get_vmi_api_exception_non_404(self): + """ + Test get_vmi raises ApiException for non-404 errors + """ + + # Mock API exception with non-404 status + api_error = ApiException(status=500, reason="Internal Server Error") + self.custom_object_client.get_namespaced_custom_object = MagicMock(side_effect=api_error) + + with self.assertRaises(ApiException): + self.plugin.get_vmi("test-vm", "default") + + def test_get_vmi_general_exception(self): + """ + Test get_vmi raises general exceptions + """ + # Mock general exception + self.custom_object_client.get_namespaced_custom_object = MagicMock( + side_effect=Exception("Connection error") + ) + + with self.assertRaises(Exception): + self.plugin.get_vmi("test-vm", "default") + + def test_get_vmis_with_regex_matching(self): + """ + Test get_vmis successfully filters VMIs by regex pattern + """ + # Mock namespace list + self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default", "test-ns"]) + + # Mock VMI list with multiple VMIs + vmi_list = { + "items": [ + {"metadata": {"name": "test-vm-1"}, "status": {"phase": "Running"}}, + {"metadata": {"name": "test-vm-2"}, "status": {"phase": "Running"}}, + {"metadata": {"name": "other-vm"}, "status": {"phase": "Running"}}, + ] + } + self.custom_object_client.list_namespaced_custom_object = MagicMock(return_value=vmi_list) + + # Test with regex pattern that matches test-vm-* + self.plugin.get_vmis("test-vm-.*", "default") + + # Should have 4 VMs (2 per namespace * 2 namespaces) + self.assertEqual(len(self.plugin.vmis_list), 4) + # Verify only test-vm-* were added + for vmi in self.plugin.vmis_list: + self.assertTrue(vmi["metadata"]["name"].startswith("test-vm-")) + + def test_get_vmis_api_exception_404(self): + """ + Test get_vmis handles 404 ApiException gracefully + """ + + self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default"]) + api_error = ApiException(status=404, reason="Not Found") + self.custom_object_client.list_namespaced_custom_object = MagicMock(side_effect=api_error) + + # Should not raise, returns empty list + result = self.plugin.get_vmis("test-vm", "default") + self.assertEqual(result, []) + + def test_get_vmis_api_exception_non_404(self): + """ + Test get_vmis raises ApiException for non-404 errors + """ + + self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default"]) + api_error = ApiException(status=500, reason="Internal Server Error") + self.custom_object_client.list_namespaced_custom_object = MagicMock(side_effect=api_error) + + with self.assertRaises(ApiException): + self.plugin.get_vmis("test-vm", "default") + + def test_patch_vm_spec_success(self): + """ + Test patch_vm_spec successfully patches VM + """ + mock_vm = { + "metadata": {"name": "test-vm", "namespace": "default"}, + "spec": {"running": True} + } + + self.custom_object_client.get_namespaced_custom_object = MagicMock(return_value=mock_vm) + self.custom_object_client.patch_namespaced_custom_object = MagicMock(return_value=mock_vm) + + result = self.plugin.patch_vm_spec("test-vm", "default", False) + + self.assertTrue(result) + self.custom_object_client.patch_namespaced_custom_object.assert_called_once() + + def test_patch_vm_spec_api_exception(self): + """ + Test patch_vm_spec handles ApiException + """ + + api_error = ApiException(status=404, reason="Not Found") + self.custom_object_client.get_namespaced_custom_object = MagicMock(side_effect=api_error) + + result = self.plugin.patch_vm_spec("test-vm", "default", False) + + self.assertFalse(result) + + def test_patch_vm_spec_general_exception(self): + """ + Test patch_vm_spec handles general exceptions + """ + self.custom_object_client.get_namespaced_custom_object = MagicMock( + side_effect=Exception("Connection error") + ) + + result = self.plugin.patch_vm_spec("test-vm", "default", False) + + self.assertFalse(result) + + def test_delete_vmi_api_exception_404(self): + """ + Test delete_vmi handles 404 ApiException during deletion + """ + + # Initialize required attributes + self.plugin.original_vmi = self.mock_vmi.copy() + self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z' + self.plugin.pods_status = PodsStatus() + self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default") + + api_error = ApiException(status=404, reason="Not Found") + self.custom_object_client.delete_namespaced_custom_object = MagicMock(side_effect=api_error) + + result = self.plugin.delete_vmi("test-vm", "default", False) + + self.assertEqual(result, 1) + + def test_delete_vmi_api_exception_non_404(self): + """ + Test delete_vmi handles non-404 ApiException during deletion + """ + # Initialize required attributes + self.plugin.original_vmi = self.mock_vmi.copy() + self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z' + self.plugin.pods_status = PodsStatus() + self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default") + + api_error = ApiException(status=500, reason="Internal Server Error") + self.custom_object_client.delete_namespaced_custom_object = MagicMock(side_effect=api_error) + + result = self.plugin.delete_vmi("test-vm", "default", False) + + self.assertEqual(result, 1) + + def test_delete_vmi_successful_recreation(self): + """ + Test delete_vmi succeeds when VMI is recreated with new creationTimestamp + """ + # Initialize required attributes - use deepcopy to avoid shared references + self.plugin.original_vmi = copy.deepcopy(self.mock_vmi) + self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z' + self.plugin.pods_status = PodsStatus() + self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default") + + self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={}) + + # Mock get_vmi to return VMI with new creationTimestamp - use deepcopy + new_vmi = copy.deepcopy(self.mock_vmi) + new_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:05:00Z' + + # Use itertools to create an infinite iterator for time values + time_iter = itertools.count(0, 0.001) + + with patch.object(self.plugin, 'get_vmi', return_value=new_vmi): + with patch('time.sleep'): + with patch('time.time', side_effect=lambda: next(time_iter)): + result = self.plugin.delete_vmi("test-vm", "default", False) + + self.assertEqual(result, 0) + self.assertIsNotNone(self.plugin.affected_pod.pod_rescheduling_time) + + def test_delete_vmi_with_disable_auto_restart_failure(self): + """ + Test delete_vmi continues when patch_vm_spec fails and VMI stays deleted + """ + # Initialize required attributes + self.plugin.original_vmi = self.mock_vmi.copy() + self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z' + self.plugin.pods_status = PodsStatus() + self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default") + + # Mock patch_vm_spec to fail + with patch.object(self.plugin, 'patch_vm_spec', return_value=False): + self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={}) + + # Mock VMI deleted (returns None) - it will timeout waiting for recreation + with patch.object(self.plugin, 'get_vmi', return_value=None): + with patch('time.sleep'): + # Use itertools to create infinite time sequence + # Use 1.0 increment to quickly reach timeout (120 seconds) + time_iter = itertools.count(0, 1.0) + with patch('time.time', side_effect=lambda: next(time_iter)): + result = self.plugin.delete_vmi("test-vm", "default", True) + + # When VMI stays deleted (None), delete_vmi waits for recreation and times out + self.assertEqual(result, 1) + + def test_wait_for_running_timeout(self): + """ + Test wait_for_running times out when VMI doesn't reach Running state + """ + self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default") + + # Mock VMI in Pending state + pending_vmi = self.mock_vmi.copy() + pending_vmi['status']['phase'] = 'Pending' + + with patch.object(self.plugin, 'get_vmi', return_value=pending_vmi): + with patch('time.sleep'): + with patch('time.time', side_effect=[0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 121]): + result = self.plugin.wait_for_running("test-vm", "default", 120) + + self.assertEqual(result, 1) + + def test_wait_for_running_vmi_not_exists(self): + """ + Test wait_for_running when VMI doesn't exist yet + """ + self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default") + + # First return None (not exists), then return running VMI + running_vmi = self.mock_vmi.copy() + running_vmi['status']['phase'] = 'Running' + + with patch.object(self.plugin, 'get_vmi', side_effect=[None, None, running_vmi]): + with patch('time.sleep'): + # time.time() called: start_time (0), while loop iteration 1 (1), iteration 2 (2), iteration 3 (3), end_time (3) + with patch('time.time', side_effect=[0, 1, 2, 3, 3]): + result = self.plugin.wait_for_running("test-vm", "default", 120) + + self.assertEqual(result, 0) + self.assertIsNotNone(self.plugin.affected_pod.pod_readiness_time) + + def test_recover_no_original_vmi(self): + """ + Test recover fails when no original VMI is captured + """ + self.plugin.original_vmi = None + + result = self.plugin.recover("test-vm", "default", False) + + self.assertEqual(result, 1) + + def test_recover_exception_during_creation(self): + """ + Test recover handles exception during VMI creation + """ + self.plugin.original_vmi = self.mock_vmi.copy() + + self.custom_object_client.create_namespaced_custom_object = MagicMock( + side_effect=Exception("Creation failed") + ) + + with patch.object(self.plugin, 'get_vmi', return_value=None): + with patch('time.sleep'): + with patch('time.time', side_effect=[0, 301]): + result = self.plugin.recover("test-vm", "default", False) + + self.assertEqual(result, 1) + + def test_execute_scenario_missing_vm_name(self): + """ + Test execute_scenario fails when vm_name is missing + """ + config = { + "parameters": { + "namespace": "default" + } + } + + result = self.plugin.execute_scenario(config, self.scenario_telemetry) + + self.assertEqual(result, 1) + + def test_execute_scenario_vmi_not_found(self): + """ + Test execute_scenario when VMI is not found after get_vmi + """ + self.plugin.vmis_list = [self.mock_vmi] + + config = { + "parameters": { + "vm_name": "test-vm", + "namespace": "default" + } + } + + with patch.object(self.plugin, 'get_vmis'): + with patch.object(self.plugin, 'validate_environment', return_value=True): + # First get_vmi returns VMI, second returns None + with patch.object(self.plugin, 'get_vmi', side_effect=[self.mock_vmi, None]): + result = self.plugin.execute_scenario(config, self.scenario_telemetry) + + # Should be PodsStatus with unrecovered pod + self.assertIsInstance(result, type(self.plugin.pods_status)) + + def test_execute_scenario_with_kill_count(self): + """ + Test execute_scenario with kill_count > 1 + """ + # Create multiple VMIs + vmi_1 = self.mock_vmi.copy() + vmi_1["metadata"]["name"] = "test-vm-1" + vmi_2 = self.mock_vmi.copy() + vmi_2["metadata"]["name"] = "test-vm-2" + + self.plugin.vmis_list = [vmi_1, vmi_2] + + config = { + "parameters": { + "vm_name": "test-vm", + "namespace": "default", + "kill_count": 2 + } + } + + # Reset counters + self.delete_count = 0 + self.wait_count = 0 + + with patch.object(self.plugin, 'get_vmis'): + with patch.object(self.plugin, 'validate_environment', return_value=True): + with patch.object(self.plugin, 'get_vmi', side_effect=[vmi_1, vmi_2]): + with patch.object(self.plugin, 'delete_vmi', side_effect=self.mock_delete) as mock_del: + with patch.object(self.plugin, 'wait_for_running', side_effect=self.mock_wait) as mock_wt: + result = self.plugin.execute_scenario(config, self.scenario_telemetry) + + # Should call delete_vmi and wait_for_running twice + self.assertEqual(mock_del.call_count, 2) + self.assertEqual(mock_wt.call_count, 2) + + def test_execute_scenario_wait_for_running_failure(self): + """ + Test execute_scenario when wait_for_running fails + """ + self.plugin.vmis_list = [self.mock_vmi] + + config = { + "parameters": { + "vm_name": "test-vm", + "namespace": "default" + } + } + + def mock_delete(*args, **kwargs): + self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default") + self.plugin.affected_pod.pod_rescheduling_time = 5.0 + return 0 + + with patch.object(self.plugin, 'get_vmis'): + with patch.object(self.plugin, 'validate_environment', return_value=True): + with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi): + with patch.object(self.plugin, 'delete_vmi', side_effect=mock_delete): + with patch.object(self.plugin, 'wait_for_running', return_value=1): + result = self.plugin.execute_scenario(config, self.scenario_telemetry) + + # Should have unrecovered pod + self.assertEqual(len(result.unrecovered), 1) + + def test_validate_environment_exception(self): + """ + Test validate_environment handles exceptions + """ + self.custom_object_client.list_namespaced_custom_object = MagicMock( + side_effect=Exception("Connection error") + ) + + result = self.plugin.validate_environment("test-vm", "default") + + self.assertFalse(result) + + def test_validate_environment_vmi_not_found(self): + """ + Test validate_environment when VMI doesn't exist + """ + # Mock CRDs exist + mock_crd_list = MagicMock() + mock_crd_list.items = MagicMock(return_value=["item1"]) + self.custom_object_client.list_namespaced_custom_object = MagicMock(return_value=mock_crd_list) + + # Mock VMI not found + with patch.object(self.plugin, 'get_vmi', return_value=None): + result = self.plugin.validate_environment("test-vm", "default") + + self.assertFalse(result) + + def test_init_clients(self): + """ + Test init_clients initializes k8s client correctly + """ + + mock_k8s = MagicMock(spec=KrknKubernetes) + mock_custom_client = MagicMock() + mock_k8s.custom_object_client = mock_custom_client + + self.plugin.init_clients(mock_k8s) + + self.assertEqual(self.plugin.k8s_client, mock_k8s) + self.assertEqual(self.plugin.custom_object_client, mock_custom_client) + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["kubevirt_vm_outage"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_managed_cluster_scenario_plugin.py b/tests/test_managed_cluster_scenario_plugin.py new file mode 100644 index 00000000..da5b8d6f --- /dev/null +++ b/tests/test_managed_cluster_scenario_plugin.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +Test suite for ManagedClusterScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_managed_cluster_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.managed_cluster.managed_cluster_scenario_plugin import ManagedClusterScenarioPlugin + + +class TestManagedClusterScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for ManagedClusterScenarioPlugin + """ + self.plugin = ManagedClusterScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["managedcluster_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_native_scenario_plugin.py b/tests/test_native_scenario_plugin.py new file mode 100644 index 00000000..5d73059b --- /dev/null +++ b/tests/test_native_scenario_plugin.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +Test suite for NativeScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_native_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.native.native_scenario_plugin import NativeScenarioPlugin + + +class TestNativeScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for NativeScenarioPlugin + """ + self.plugin = NativeScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario types + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["pod_network_scenarios", "ingress_node_scenarios"]) + self.assertEqual(len(result), 2) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_network_chaos_ng_scenario_plugin.py b/tests/test_network_chaos_ng_scenario_plugin.py new file mode 100644 index 00000000..d971e696 --- /dev/null +++ b/tests/test_network_chaos_ng_scenario_plugin.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +Test suite for NetworkChaosNgScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_network_chaos_ng_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.network_chaos_ng.network_chaos_ng_scenario_plugin import NetworkChaosNgScenarioPlugin + + +class TestNetworkChaosNgScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for NetworkChaosNgScenarioPlugin + """ + self.plugin = NetworkChaosNgScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["network_chaos_ng_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_network_chaos_scenario_plugin.py b/tests/test_network_chaos_scenario_plugin.py new file mode 100644 index 00000000..ac35c43b --- /dev/null +++ b/tests/test_network_chaos_scenario_plugin.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +Test suite for NetworkChaosScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_network_chaos_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.network_chaos.network_chaos_scenario_plugin import NetworkChaosScenarioPlugin + + +class TestNetworkChaosScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for NetworkChaosScenarioPlugin + """ + self.plugin = NetworkChaosScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["network_chaos_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_node_actions_scenario_plugin.py b/tests/test_node_actions_scenario_plugin.py new file mode 100644 index 00000000..2db95d50 --- /dev/null +++ b/tests/test_node_actions_scenario_plugin.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +Test suite for NodeActionsScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_node_actions_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.node_actions.node_actions_scenario_plugin import NodeActionsScenarioPlugin + + +class TestNodeActionsScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for NodeActionsScenarioPlugin + """ + self.plugin = NodeActionsScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["node_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_pod_disruption_scenario_plugin.py b/tests/test_pod_disruption_scenario_plugin.py new file mode 100644 index 00000000..6c069f16 --- /dev/null +++ b/tests/test_pod_disruption_scenario_plugin.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +Test suite for PodDisruptionScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_pod_disruption_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.pod_disruption.pod_disruption_scenario_plugin import PodDisruptionScenarioPlugin + + +class TestPodDisruptionScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for PodDisruptionScenarioPlugin + """ + self.plugin = PodDisruptionScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["pod_disruption_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_pvc_scenario_plugin.py b/tests/test_pvc_scenario_plugin.py new file mode 100644 index 00000000..053cc4b5 --- /dev/null +++ b/tests/test_pvc_scenario_plugin.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +Test suite for PvcScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_pvc_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.pvc.pvc_scenario_plugin import PvcScenarioPlugin + + +class TestPvcScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for PvcScenarioPlugin + """ + self.plugin = PvcScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["pvc_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_service_disruption_scenario_plugin.py b/tests/test_service_disruption_scenario_plugin.py new file mode 100644 index 00000000..dba567d9 --- /dev/null +++ b/tests/test_service_disruption_scenario_plugin.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +""" +Test suite for ServiceDisruptionScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_service_disruption_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.service_disruption.service_disruption_scenario_plugin import ServiceDisruptionScenarioPlugin + + +class TestServiceDisruptionScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for ServiceDisruptionScenarioPlugin + """ + self.plugin = ServiceDisruptionScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["service_disruption_scenarios"]) + self.assertEqual(len(result), 1) + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_service_hijacking_scenario_plugin.py b/tests/test_service_hijacking_scenario_plugin.py new file mode 100644 index 00000000..c5644242 --- /dev/null +++ b/tests/test_service_hijacking_scenario_plugin.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +""" +Test suite for ServiceHijackingScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_service_hijacking_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest + +from krkn.scenario_plugins.service_hijacking.service_hijacking_scenario_plugin import ServiceHijackingScenarioPlugin + + +class TestServiceHijackingScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for ServiceHijackingScenarioPlugin + """ + self.plugin = ServiceHijackingScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["service_hijacking_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_shut_down_scenario_plugin.py b/tests/test_shut_down_scenario_plugin.py new file mode 100644 index 00000000..f25deef0 --- /dev/null +++ b/tests/test_shut_down_scenario_plugin.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +""" +Test suite for ShutDownScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_shut_down_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest + +from krkn.scenario_plugins.shut_down.shut_down_scenario_plugin import ShutDownScenarioPlugin + + +class TestShutDownScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for ShutDownScenarioPlugin + """ + self.plugin = ShutDownScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["cluster_shut_down_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_syn_flood_scenario_plugin.py b/tests/test_syn_flood_scenario_plugin.py new file mode 100644 index 00000000..412380aa --- /dev/null +++ b/tests/test_syn_flood_scenario_plugin.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +""" +Test suite for SynFloodScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_syn_flood_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest + +from krkn.scenario_plugins.syn_flood.syn_flood_scenario_plugin import SynFloodScenarioPlugin + + +class TestSynFloodScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for SynFloodScenarioPlugin + """ + self.plugin = SynFloodScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["syn_flood_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_time_actions_scenario_plugin.py b/tests/test_time_actions_scenario_plugin.py new file mode 100644 index 00000000..3a2dc4d8 --- /dev/null +++ b/tests/test_time_actions_scenario_plugin.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +Test suite for TimeActionsScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_time_actions_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.time_actions.time_actions_scenario_plugin import TimeActionsScenarioPlugin + + +class TestTimeActionsScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for TimeActionsScenarioPlugin + """ + self.plugin = TimeActionsScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["time_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_virt_checker.py b/tests/test_virt_checker.py new file mode 100644 index 00000000..b057e906 --- /dev/null +++ b/tests/test_virt_checker.py @@ -0,0 +1,511 @@ +#!/usr/bin/env python3 + +""" +Test suite for VirtChecker class + +This test file provides comprehensive coverage for the main functionality of VirtChecker: +- Initialization with various configurations +- VM access checking (both virtctl and disconnected modes) +- Disconnected mode with IP/node changes +- Thread management +- Post-check validation + +Usage: + python -m coverage run -a -m unittest tests/test_virt_checker.py -v + +Note: This test file uses mocks extensively to avoid needing actual Kubernetes/KubeVirt infrastructure. + +Created By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock, patch +import sys +from krkn.utils.VirtChecker import VirtChecker +import os + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +# Create a mock VirtCheck class before any imports +class MockVirtCheck: + """Mock VirtCheck class for testing""" + def __init__(self, data): + self.vm_name = data.get('vm_name', '') + self.ip_address = data.get('ip_address', '') + self.namespace = data.get('namespace', '') + self.node_name = data.get('node_name', '') + self.new_ip_address = data.get('new_ip_address', '') + self.status = data.get('status', False) + self.start_timestamp = data.get('start_timestamp', '') + self.end_timestamp = data.get('end_timestamp', '') + self.duration = data.get('duration', 0) + + +class TestVirtChecker(unittest.TestCase): + """Test suite for VirtChecker class""" + + def setUp(self): + """Set up test fixtures before each test method""" + self.mock_krkn_lib = MagicMock() + + # Mock VMI data + self.mock_vmi_1 = { + "metadata": {"name": "test-vm-1", "namespace": "test-namespace"}, + "status": { + "nodeName": "worker-1", + "interfaces": [{"ipAddress": "192.168.1.10"}] + } + } + + self.mock_vmi_2 = { + "metadata": {"name": "test-vm-2", "namespace": "test-namespace"}, + "status": { + "nodeName": "worker-2", + "interfaces": [{"ipAddress": "192.168.1.11"}] + } + } + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + def test_init_with_empty_namespace(self, mock_plugin_class, mock_yaml): + """Test VirtChecker initialization with empty namespace (should skip checks)""" + def yaml_getter(config, key, default): + if key == "namespace": + return "" + return default + mock_yaml.side_effect = yaml_getter + + checker = VirtChecker( + {"namespace": ""}, + iterations=5, + krkn_lib=self.mock_krkn_lib + ) + + # Should set batch_size to 0 and not initialize plugin + self.assertEqual(checker.batch_size, 0) + mock_plugin_class.assert_not_called() + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + def test_regex_namespace(self, mock_plugin_class, mock_yaml): + """Test VirtChecker initialization with regex namespace pattern""" + # Setup mock plugin with VMI data + mock_plugin = MagicMock() + mock_plugin.vmis_list = [self.mock_vmi_1, self.mock_vmi_2] + mock_plugin_class.return_value = mock_plugin + + def yaml_getter(config, key, default): + return config.get(key, default) + mock_yaml.side_effect = yaml_getter + + checker = VirtChecker( + {"namespace": "test-*"}, + iterations=1, + krkn_lib=self.mock_krkn_lib + ) + + self.assertGreater(len(checker.vm_list), 0) + self.assertEqual(len(checker.vm_list), 2) + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + def test_with_node_name(self, mock_plugin_class, mock_yaml): + """Test VirtChecker initialization with specific VM names""" + # Setup mock plugin with VMI data + mock_plugin = MagicMock() + mock_plugin.vmis_list = [self.mock_vmi_1, self.mock_vmi_2] + mock_plugin_class.return_value = mock_plugin + + def yaml_getter(config, key, default): + return config.get(key, default) + mock_yaml.side_effect = yaml_getter + + # Test with VM name pattern + checker = VirtChecker( + {"namespace": "test-namespace", "name": "test-vm-.*"}, + iterations=5, + krkn_lib=self.mock_krkn_lib + ) + + self.assertGreater(checker.batch_size, 0) + self.assertEqual(len(checker.vm_list), 2) + + # Test with specific VM name + + mock_plugin = MagicMock() + mock_plugin.vmis_list = [self.mock_vmi_2] + mock_plugin_class.return_value = mock_plugin + checker2 = VirtChecker( + {"namespace": "test-namespace", "name": "test-vm-1"}, + iterations=5, + krkn_lib=self.mock_krkn_lib + ) + + self.assertGreater(checker2.batch_size, 0) + self.assertEqual(len(checker2.vm_list), 1) + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + def test_with_regex_name(self, mock_plugin_class, mock_yaml): + """Test VirtChecker initialization filtering by node names""" + # Setup mock plugin with VMI data + mock_plugin = MagicMock() + mock_plugin.vmis_list = [self.mock_vmi_1, self.mock_vmi_2] + mock_plugin_class.return_value = mock_plugin + + def yaml_getter(config, key, default): + return config.get(key, default) + mock_yaml.side_effect = yaml_getter + + # Test filtering by node name - should only include VMs on worker-2 + checker = VirtChecker( + {"namespace": "test-namespace", "node_names": "worker-2"}, + iterations=5, + krkn_lib=self.mock_krkn_lib + ) + + self.assertGreater(checker.batch_size, 0) + # Only test-vm-2 is on worker-2, so vm_list should have 1 VM + self.assertEqual(len(checker.vm_list), 1) + self.assertEqual(checker.vm_list[0].vm_name, "test-vm-2") + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + @patch('krkn.utils.VirtChecker.invoke_no_exit') + def test_get_vm_access_success(self, mock_invoke, mock_plugin_class, mock_yaml): + """Test get_vm_access returns True when VM is accessible""" + mock_plugin = MagicMock() + mock_plugin.vmis_list = [] + mock_plugin_class.return_value = mock_plugin + + def yaml_getter(config, key, default): + if key == "namespace": + return "test-ns" + return default + mock_yaml.side_effect = yaml_getter + + # Mock successful access + mock_invoke.return_value = "True" + + checker = VirtChecker( + {"namespace": "test-ns"}, + iterations=1, + krkn_lib=self.mock_krkn_lib + ) + + result = checker.get_vm_access("test-vm", "test-namespace") + + self.assertTrue(result) + # Should try first command and succeed + self.assertGreaterEqual(mock_invoke.call_count, 1) + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + @patch('krkn.utils.VirtChecker.invoke_no_exit') + def test_get_vm_access_failure(self, mock_invoke, mock_plugin_class, mock_yaml): + """Test get_vm_access returns False when VM is not accessible""" + mock_plugin = MagicMock() + mock_plugin.vmis_list = [] + mock_plugin_class.return_value = mock_plugin + + def yaml_getter(config, key, default): + if key == "namespace": + return "test-ns" + return default + mock_yaml.side_effect = yaml_getter + + # Mock failed access + mock_invoke.return_value = "False" + + checker = VirtChecker( + {"namespace": "test-ns"}, + iterations=1, + krkn_lib=self.mock_krkn_lib + ) + + result = checker.get_vm_access("test-vm", "test-namespace") + + self.assertFalse(result) + # Should try both commands + self.assertEqual(mock_invoke.call_count, 2) + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + @patch('krkn.utils.VirtChecker.invoke_no_exit') + def test_check_disconnected_access_success(self, mock_invoke, mock_plugin_class, mock_yaml): + """Test check_disconnected_access with successful connection""" + mock_plugin = MagicMock() + mock_plugin.vmis_list = [] + mock_plugin_class.return_value = mock_plugin + + def yaml_getter(config, key, default): + if key == "namespace": + return "test-ns" + return default + mock_yaml.side_effect = yaml_getter + + # Mock successful disconnected access + mock_invoke.side_effect = ["some output", "True"] + + checker = VirtChecker( + {"namespace": "test-ns"}, + iterations=1, + krkn_lib=self.mock_krkn_lib + ) + + result, new_ip, new_node = checker.check_disconnected_access( + "192.168.1.10", + "worker-1", + "test-vm" + ) + + self.assertTrue(result) + self.assertIsNone(new_ip) + self.assertIsNone(new_node) + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + @patch('krkn.utils.VirtChecker.invoke_no_exit') + def test_check_disconnected_access_with_new_ip(self, mock_invoke, mock_plugin_class, mock_yaml): + """Test check_disconnected_access when VM has new IP address""" + mock_plugin = MagicMock() + mock_plugin.vmis_list = [] + mock_plugin_class.return_value = mock_plugin + + def yaml_getter(config, key, default): + if key == "namespace": + return "test-ns" + return default + mock_yaml.side_effect = yaml_getter + + # Mock failed first attempt, successful second with new IP + mock_invoke.side_effect = ["some output", "False", "True"] + + mock_vmi = { + "status": { + "nodeName": "worker-1", + "interfaces": [{"ipAddress": "192.168.1.20"}] + } + } + mock_plugin.get_vmi = MagicMock(return_value=mock_vmi) + + checker = VirtChecker( + {"namespace": "test-ns"}, + iterations=1, + krkn_lib=self.mock_krkn_lib + ) + checker.kube_vm_plugin = mock_plugin + + result, new_ip, new_node = checker.check_disconnected_access( + "192.168.1.10", + "worker-1", + "test-vm" + ) + + self.assertTrue(result) + self.assertEqual(new_ip, "192.168.1.20") + self.assertIsNone(new_node) + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + @patch('krkn.utils.VirtChecker.invoke_no_exit') + def test_check_disconnected_access_with_new_node(self, mock_invoke, mock_plugin_class, mock_yaml): + """Test check_disconnected_access when VM moved to new node""" + mock_plugin = MagicMock() + mock_plugin.vmis_list = [] + mock_plugin_class.return_value = mock_plugin + + def yaml_getter(config, key, default): + if key == "namespace": + return "test-ns" + return default + mock_yaml.side_effect = yaml_getter + + # Mock failed attempts, successful on new node + # Call sequence: debug_check, initial_check, check_on_new_node + mock_invoke.side_effect = ["some output", "False", "True"] + + mock_vmi = { + "status": { + "nodeName": "worker-2", + "interfaces": [{"ipAddress": "192.168.1.10"}] + } + } + mock_plugin.get_vmi = MagicMock(return_value=mock_vmi) + + checker = VirtChecker( + {"namespace": "test-ns"}, + iterations=1, + krkn_lib=self.mock_krkn_lib + ) + checker.kube_vm_plugin = mock_plugin + + result, new_ip, new_node = checker.check_disconnected_access( + "192.168.1.10", + "worker-1", + "test-vm" + ) + + self.assertTrue(result) + self.assertEqual(new_ip, "192.168.1.10") + self.assertEqual(new_node, "worker-2") + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + @patch('krkn.utils.VirtChecker.invoke_no_exit') + def test_check_disconnected_access_with_ssh_node_fallback(self, mock_invoke, mock_plugin_class, mock_yaml): + """Test check_disconnected_access falls back to ssh_node""" + mock_plugin = MagicMock() + mock_plugin.vmis_list = [] + mock_plugin_class.return_value = mock_plugin + + def yaml_getter(config, key, default): + if key == "namespace": + return "test-ns" + elif key == "ssh_node": + return "worker-0" + return default + mock_yaml.side_effect = yaml_getter + + # Mock failed attempts on original node, successful on ssh_node fallback + # Call sequence: debug_check, initial_check_on_worker-1, fallback_check_on_ssh_node + # Since IP and node haven't changed, it goes directly to ssh_node fallback + mock_invoke.side_effect = ["some output", "False", "True"] + + mock_vmi = { + "status": { + "nodeName": "worker-1", + "interfaces": [{"ipAddress": "192.168.1.10"}] + } + } + mock_plugin.get_vmi = MagicMock(return_value=mock_vmi) + + checker = VirtChecker( + {"namespace": "test-ns", "ssh_node": "worker-0"}, + iterations=1, + krkn_lib=self.mock_krkn_lib + ) + checker.kube_vm_plugin = mock_plugin + + result, new_ip, new_node = checker.check_disconnected_access( + "192.168.1.10", + "worker-1", + "test-vm" + ) + + self.assertTrue(result) + self.assertEqual(new_ip, "192.168.1.10") + self.assertIsNone(new_node) + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + def test_thread_join(self, mock_plugin_class, mock_yaml): + """Test thread_join waits for all threads""" + mock_plugin = MagicMock() + mock_plugin.vmis_list = [] + mock_plugin_class.return_value = mock_plugin + + def yaml_getter(config, key, default): + if key == "namespace": + return "test-ns" + return default + mock_yaml.side_effect = yaml_getter + + checker = VirtChecker( + {"namespace": "test-ns"}, + iterations=1, + krkn_lib=self.mock_krkn_lib + ) + + # Create mock threads + mock_thread_1 = MagicMock() + mock_thread_2 = MagicMock() + checker.threads = [mock_thread_1, mock_thread_2] + + checker.thread_join() + + mock_thread_1.join.assert_called_once() + mock_thread_2.join.assert_called_once() + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + def test_init_exception_handling(self, mock_plugin_class, mock_yaml): + """Test VirtChecker handles exceptions during initialization""" + mock_plugin = MagicMock() + mock_plugin.init_clients.side_effect = Exception("Connection error") + mock_plugin_class.return_value = mock_plugin + + def yaml_getter(config, key, default): + if key == "namespace": + return "test-ns" + return default + mock_yaml.side_effect = yaml_getter + + config = {"namespace": "test-ns"} + + # Should not raise exception + checker = VirtChecker( + config, + iterations=1, + krkn_lib=self.mock_krkn_lib + ) + + # VM list should be empty due to exception + self.assertEqual(len(checker.vm_list), 0) + + @patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck) + @patch('krkn.utils.VirtChecker.get_yaml_item_value') + @patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin') + def test_batch_size_calculation(self, mock_plugin_class, mock_yaml): + """Test batch size calculation based on VM count and thread limit""" + mock_plugin = MagicMock() + + # Create 25 mock VMIs + mock_vmis = [] + for i in range(25): + vmi = { + "metadata": {"name": f"vm-{i}", "namespace": "test-ns"}, + "status": { + "nodeName": "worker-1", + "interfaces": [{"ipAddress": f"192.168.1.{i}"}] + } + } + mock_vmis.append(vmi) + + mock_plugin.vmis_list = mock_vmis + mock_plugin_class.return_value = mock_plugin + + def yaml_getter(config, key, default): + if key == "namespace": + return "test-ns" + elif key == "node_names": + return "" + return default + mock_yaml.side_effect = yaml_getter + + config = {"namespace": "test-ns"} + checker = VirtChecker( + config, + iterations=5, + krkn_lib=self.mock_krkn_lib, + threads_limit=10 + ) + + # 25 VMs / 10 threads = 3 VMs per batch (ceiling) + self.assertEqual(checker.batch_size, 3) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_zone_outage_scenario_plugin.py b/tests/test_zone_outage_scenario_plugin.py new file mode 100644 index 00000000..f7369797 --- /dev/null +++ b/tests/test_zone_outage_scenario_plugin.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 + +""" +Test suite for ZoneOutageScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_zone_outage_scenario_plugin.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.zone_outage.zone_outage_scenario_plugin import ZoneOutageScenarioPlugin + + +class TestZoneOutageScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for ZoneOutageScenarioPlugin + """ + self.plugin = ZoneOutageScenarioPlugin() + + def test_get_scenario_types(self): + """ + Test get_scenario_types returns correct scenario type + """ + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["zone_outages_scenarios"]) + self.assertEqual(len(result), 1) + + +if __name__ == "__main__": + unittest.main()