diff --git a/config/config.yaml b/config/config.yaml index 525c4292..5eec9687 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -47,6 +47,8 @@ kraken: - scenarios/kube/syn_flood.yaml - network_chaos_ng_scenarios: - scenarios/kube/network-filter.yml + - kubevirt_vm_outage: + - scenarios/kubevirt/kubevirt-vm-outage.yaml cerberus: cerberus_enabled: False # Enable it when cerberus is previously installed diff --git a/krkn/__init__.py b/krkn/scenario_plugins/kubevirt_vm_outage/__init__.py similarity index 100% rename from krkn/__init__.py rename to krkn/scenario_plugins/kubevirt_vm_outage/__init__.py diff --git a/krkn/scenario_plugins/kubevirt_vm_outage/kubevirt_vm_outage_scenario_plugin.py b/krkn/scenario_plugins/kubevirt_vm_outage/kubevirt_vm_outage_scenario_plugin.py new file mode 100644 index 00000000..bd1d2d28 --- /dev/null +++ b/krkn/scenario_plugins/kubevirt_vm_outage/kubevirt_vm_outage_scenario_plugin.py @@ -0,0 +1,339 @@ +import logging +import time +from typing import Dict, Any, Optional + +import yaml +from kubernetes.client.rest import ApiException +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.models.telemetry import ScenarioTelemetry +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift +from krkn_lib.utils import log_exception + +from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin + + +class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin): + """ + A scenario plugin that injects chaos by deleting a KubeVirt Virtual Machine Instance (VMI). + This plugin simulates a VM crash or outage scenario and supports automated or manual recovery. + """ + + def __init__(self): + self.k8s_client = None + self.original_vmi = None + + # Scenario type is handled directly in execute_scenario + def get_scenario_types(self) -> list[str]: + return ["kubevirt_vm_outage"] + + + def run( + self, + run_uuid: str, + scenario: str, + krkn_config: dict[str, any], + lib_telemetry: KrknTelemetryOpenshift, + scenario_telemetry: ScenarioTelemetry, + ) -> int: + """ + Main entry point for the plugin. + Parses the scenario configuration and executes the chaos scenario. + """ + try: + with open(scenario, "r") as f: + scenario_config = yaml.full_load(f) + + self.init_clients(lib_telemetry.get_lib_kubernetes()) + + for config in scenario_config["scenarios"]: + if config.get("scenario") == "kubevirt_vm_outage": + result = self.execute_scenario(config, scenario_telemetry) + if result != 0: + return 1 + + return 0 + except Exception as e: + logging.error(f"KubeVirt VM Outage scenario failed: {e}") + log_exception(e) + return 1 + + def init_clients(self, k8s_client: KrknKubernetes): + """ + Initialize Kubernetes client for KubeVirt operations. + """ + self.k8s_client = k8s_client + self.custom_object_client = k8s_client.custom_object_client + logging.info("Successfully initialized Kubernetes client for KubeVirt operations") + + def get_vmi(self, name: str, namespace: str) -> Optional[Dict]: + """ + Get a Virtual Machine Instance by name and namespace. + + :param name: Name of the VMI to retrieve + :param namespace: Namespace of the VMI + :return: The VMI object if found, None otherwise + """ + try: + vmi = self.custom_object_client.get_namespaced_custom_object( + group="kubevirt.io", + version="v1", + namespace=namespace, + plural="virtualmachineinstances", + name=name + ) + return vmi + except ApiException as e: + if e.status == 404: + logging.warning(f"VMI {name} not found in namespace {namespace}") + return None + else: + logging.error(f"Error getting VMI {name}: {e}") + raise + except Exception as e: + logging.error(f"Unexpected error getting VMI {name}: {e}") + raise + + def execute_scenario(self, config: Dict[str, Any], scenario_telemetry: ScenarioTelemetry) -> int: + """ + Execute a KubeVirt VM outage scenario based on the provided configuration. + + :param config: The scenario configuration + :param scenario_telemetry: The telemetry object for recording metrics + :return: 0 for success, 1 for failure + """ + try: + params = config.get("parameters", {}) + vm_name = params.get("vm_name") + namespace = params.get("namespace", "default") + timeout = params.get("timeout", 60) + disable_auto_restart = params.get("disable_auto_restart", False) + + if not vm_name: + logging.error("vm_name parameter is required") + return 1 + + logging.info(f"Starting KubeVirt VM outage scenario for VM: {vm_name} in namespace: {namespace}") + + if not self.validate_environment(vm_name, namespace): + return 1 + + vmi = self.get_vmi(vm_name, namespace) + if not vmi: + logging.error(f"VMI {vm_name} not found in namespace {namespace}") + return 1 + + self.original_vmi = vmi + logging.info(f"Captured initial state of VMI: {vm_name}") + + result = self.inject(vm_name, namespace, disable_auto_restart) + if result != 0: + return 1 + result = self.wait_for_running(vm_name,namespace, timeout) + if result != 0: + logging.info(f"VM didn't become running in {timeout}s") + + logging.info(f"Successfully completed KubeVirt VM outage scenario for VM: {vm_name}") + return 0 + + except Exception as e: + logging.error(f"Error executing KubeVirt VM outage scenario: {e}") + log_exception(e) + return 1 + + def validate_environment(self, vm_name: str, namespace: str) -> bool: + """ + Validate that KubeVirt is installed and the specified VM exists. + + :param vm_name: Name of the VM to validate + :param namespace: Namespace of the VM + :return: True if environment is valid, False otherwise + """ + try: + # Check if KubeVirt CRDs exist + crd_list = self.custom_object_client.list_namespaced_custom_object("kubevirt.io","v1",namespace,"virtualmachines") + kubevirt_crds = [crd for crd in crd_list.items() ] + + if not kubevirt_crds: + logging.error("KubeVirt CRDs not found. Ensure KubeVirt/CNV is installed in the cluster") + return False + + # Check if VMI exists + vmi = self.get_vmi(vm_name, namespace) + if not vmi: + logging.error(f"VMI {vm_name} not found in namespace {namespace}") + return False + + logging.info(f"Validated environment: KubeVirt is installed and VMI {vm_name} exists") + return True + + except Exception as e: + logging.error(f"Error validating environment: {e}") + return False + + def patch_vm_spec(self, vm_name: str, namespace: str, running: bool) -> bool: + """ + Patch the VM spec to enable/disable auto-restart. + + :param vm_name: Name of the VM to patch + :param namespace: Namespace of the VM + :param running: Whether the VM should be set to running state + :return: True if patch was successful, False otherwise + """ + try: + # Get the VM object first to get its current spec + vm = self.custom_object_client.get_namespaced_custom_object( + group="kubevirt.io", + version="v1", + namespace=namespace, + plural="virtualmachines", + name=vm_name + ) + + # Update the running state + if 'spec' not in vm: + vm['spec'] = {} + vm['spec']['running'] = running + + # Apply the patch + self.custom_object_client.patch_namespaced_custom_object( + group="kubevirt.io", + version="v1", + namespace=namespace, + plural="virtualmachines", + name=vm_name, + body=vm + ) + return True + + except ApiException as e: + logging.error(f"Failed to patch VM {vm_name}: {e}") + return False + except Exception as e: + logging.error(f"Unexpected error patching VM {vm_name}: {e}") + return False + + def inject(self, vm_name: str, namespace: str, disable_auto_restart: bool = False) -> int: + """ + Delete a Virtual Machine Instance to simulate a VM outage. + + :param vm_name: Name of the VMI to delete + :param namespace: Namespace of the VMI + :return: 0 for success, 1 for failure + """ + try: + logging.info(f"Injecting chaos: Deleting VMI {vm_name} in namespace {namespace}") + + # If auto-restart should be disabled, patch the VM spec first + if disable_auto_restart: + logging.info(f"Disabling auto-restart for VM {vm_name} by setting spec.running=False") + if not self.patch_vm_spec(vm_name, namespace, running=False): + logging.error("Failed to disable auto-restart for VM" + " - proceeding with deletion but VM may auto-restart") + start_creation_time = self.original_vmi.get('metadata', {}).get('creationTimestamp') + try: + self.custom_object_client.delete_namespaced_custom_object( + group="kubevirt.io", + version="v1", + namespace=namespace, + plural="virtualmachineinstances", + name=vm_name + ) + except ApiException as e: + if e.status == 404: + logging.warning(f"VMI {vm_name} not found during deletion") + return 1 + else: + logging.error(f"API error during VMI deletion: {e}") + return 1 + + # Wait for the VMI to be deleted + timeout = 120 # seconds + start_time = time.time() + while time.time() - start_time < timeout: + deleted_vmi = self.get_vmi(vm_name, namespace) + if deleted_vmi: + if start_creation_time != deleted_vmi.get('metadata', {}).get('creationTimestamp'): + logging.info(f"VMI {vm_name} successfully recreated") + return 0 + else: + logging.info(f"VMI {vm_name} successfully deleted") + time.sleep(1) + + logging.error(f"Timed out waiting for VMI {vm_name} to be deleted") + return 1 + + except Exception as e: + logging.error(f"Error deleting VMI {vm_name}: {e}") + log_exception(e) + return 1 + + def wait_for_running(self, vm_name: str, namespace: str, timeout: int = 120) -> int: + start_time = time.time() + while time.time() - start_time < timeout: + + # Check current state once since we've already waited for the duration + vmi = self.get_vmi(vm_name, namespace) + + if vmi: + if vmi.get('status', {}).get('phase') == "Running": + logging.info(f"VMI {vm_name} is already running") + return 0 + logging.info(f"VMI {vm_name} exists but is not in Running state. Current state: {vmi.get('status', {}).get('phase')}") + else: + logging.info(f"VMI {vm_name} not yet recreated") + time.sleep(1) + return 1 + + + def recover(self, vm_name: str, namespace: str, disable_auto_restart: bool = False) -> int: + """ + Recover a deleted VMI, either by waiting for auto-recovery or manually recreating it. + + :param vm_name: Name of the VMI to recover + :param namespace: Namespace of the VMI + :param disable_auto_restart: Whether auto-restart was disabled during injection + :return: 0 for success, 1 for failure + """ + try: + logging.info(f"Attempting to recover VMI {vm_name} in namespace {namespace}") + + if self.original_vmi: + logging.info(f"Auto-recovery didn't occur for VMI {vm_name}. Attempting manual recreation") + + try: + # Clean up server-generated fields + vmi_dict = self.original_vmi.copy() + if 'metadata' in vmi_dict: + metadata = vmi_dict['metadata'] + for field in ['resourceVersion', 'uid', 'creationTimestamp', 'generation']: + if field in metadata: + del metadata[field] + + # Create the VMI + self.custom_object_client.create_namespaced_custom_object( + group="kubevirt.io", + version="v1", + namespace=namespace, + plural="virtualmachineinstances", + body=vmi_dict + ) + logging.info(f"Successfully recreated VMI {vm_name}") + + # Wait for VMI to start running + self.wait_for_running(vm_name, namespace) + + logging.warning(f"VMI {vm_name} was recreated but didn't reach Running state in time") + return 0 # Still consider it a success as the VMI was recreated + + except Exception as e: + logging.error(f"Error recreating VMI {vm_name}: {e}") + log_exception(e) + return 1 + else: + logging.error(f"Failed to recover VMI {vm_name}: No original state captured and auto-recovery did not occur") + return 1 + + except Exception as e: + logging.error(f"Unexpected error recovering VMI {vm_name}: {e}") + log_exception(e) + return 1 diff --git a/scenarios/kubevirt/kubevirt-vm-outage.yaml b/scenarios/kubevirt/kubevirt-vm-outage.yaml new file mode 100644 index 00000000..66bd3d73 --- /dev/null +++ b/scenarios/kubevirt/kubevirt-vm-outage.yaml @@ -0,0 +1,7 @@ +scenarios: + - name: "kubevirt outage test" + scenario: kubevirt_vm_outage + parameters: + vm_name: + namespace: + timeout: 60 diff --git a/tests/kubevirt_vm_outage/test_kubevirt_vm_outage.py b/tests/kubevirt_vm_outage/test_kubevirt_vm_outage.py new file mode 100644 index 00000000..ccfdc2f0 --- /dev/null +++ b/tests/kubevirt_vm_outage/test_kubevirt_vm_outage.py @@ -0,0 +1,215 @@ +import unittest +import time +from unittest.mock import MagicMock, patch + +import yaml +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.models.telemetry import ScenarioTelemetry +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift + +from krkn.scenario_plugins.kubevirt_vm_outage.kubevirt_vm_outage_scenario_plugin import KubevirtVmOutageScenarioPlugin + + +class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures for KubevirtVmOutageScenarioPlugin + """ + self.plugin = KubevirtVmOutageScenarioPlugin() + + # Create mock k8s client + self.k8s_client = MagicMock() + self.custom_object_client = MagicMock() + self.k8s_client.custom_object_client = self.custom_object_client + self.plugin.k8s_client = self.k8s_client + + # Mock methods needed for KubeVirt operations + self.k8s_client.list_custom_resource_definition = MagicMock() + + # Mock custom resource definition list with KubeVirt CRDs + crd_list = MagicMock() + crd_item = MagicMock() + crd_item.spec = MagicMock() + crd_item.spec.group = "kubevirt.io" + crd_list.items = [crd_item] + self.k8s_client.list_custom_resource_definition.return_value = crd_list + + # Mock VMI data + self.mock_vmi = { + "metadata": { + "name": "test-vm", + "namespace": "default" + }, + "status": { + "phase": "Running" + } + } + + # Create test config + self.config = { + "scenarios": [ + { + "name": "kubevirt outage test", + "scenario": "kubevirt_vm_outage", + "parameters": { + "vm_name": "test-vm", + "namespace": "default", + "duration": 0 + } + } + ] + } + + # Create a temporary config file + import tempfile, os + temp_dir = tempfile.gettempdir() + self.scenario_file = os.path.join(temp_dir, "test_kubevirt_scenario.yaml") + with open(self.scenario_file, "w") as f: + yaml.dump(self.config, f) + + # Mock dependencies + self.telemetry = MagicMock(spec=KrknTelemetryOpenshift) + self.scenario_telemetry = MagicMock(spec=ScenarioTelemetry) + self.telemetry.get_lib_kubernetes.return_value = self.k8s_client + + def test_successful_injection_and_recovery(self): + """ + Test successful deletion and recovery of a VMI + """ + # Mock get_vmi to return our mock VMI + with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi): + # Mock inject and recover to simulate success + with patch.object(self.plugin, 'inject', return_value=0) as mock_inject: + with patch.object(self.plugin, 'recover', return_value=0) as mock_recover: + with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))): + result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry) + + self.assertEqual(result, 0) + mock_inject.assert_called_once_with("test-vm", "default", False) + mock_recover.assert_called_once_with("test-vm", "default", False) + + def test_injection_failure(self): + """ + Test failure during VMI deletion + """ + # Mock get_vmi to return our mock VMI + with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi): + # Mock inject to simulate failure + with patch.object(self.plugin, 'inject', return_value=1) as mock_inject: + with patch.object(self.plugin, 'recover', return_value=0) as mock_recover: + with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))): + result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry) + + self.assertEqual(result, 1) + mock_inject.assert_called_once_with("test-vm", "default", False) + mock_recover.assert_not_called() + + def test_disable_auto_restart(self): + """ + Test VM auto-restart can be disabled + """ + # Configure test with disable_auto_restart=True + self.config["scenarios"][0]["parameters"]["disable_auto_restart"] = True + + # Mock VM object for patching + mock_vm = { + "metadata": {"name": "test-vm", "namespace": "default"}, + "spec": {} + } + + # Mock get_vmi to return our mock VMI + with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi): + # Mock VM patch operation + with patch.object(self.plugin, 'patch_vm_spec') as mock_patch_vm: + mock_patch_vm.return_value = True + # Mock inject and recover + with patch.object(self.plugin, 'inject', return_value=0) as mock_inject: + with patch.object(self.plugin, 'recover', return_value=0) as mock_recover: + with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))): + result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry) + + self.assertEqual(result, 0) + # Should call patch_vm_spec to disable auto-restart + mock_patch_vm.assert_any_call("test-vm", "default", False) + # Should call patch_vm_spec to re-enable auto-restart during recovery + mock_patch_vm.assert_any_call("test-vm", "default", True) + mock_inject.assert_called_once_with("test-vm", "default", True) + mock_recover.assert_called_once_with("test-vm", "default", True) + + def test_recovery_when_vmi_does_not_exist(self): + """ + Test recovery logic when VMI does not exist after deletion + """ + # Store the original VMI in the plugin for recovery + self.plugin.original_vmi = self.mock_vmi.copy() + + # Create a cleaned vmi_dict as the plugin would + vmi_dict = self.mock_vmi.copy() + + # Set up running VMI data for after recovery + running_vmi = { + "metadata": {"name": "test-vm", "namespace": "default"}, + "status": {"phase": "Running"} + } + + # Set up time.time to immediately exceed the timeout for auto-recovery + with patch('time.time', side_effect=[0, 301, 301, 301, 301, 310, 320]): + # Mock get_vmi to always return None (not auto-recovered) + with patch.object(self.plugin, 'get_vmi', side_effect=[None, None, running_vmi]): + # Mock the custom object API to return success + self.custom_object_client.create_namespaced_custom_object = MagicMock(return_value=running_vmi) + + # Run recovery with mocked time.sleep + with patch('time.sleep'): + result = self.plugin.recover("test-vm", "default", False) + + self.assertEqual(result, 0) + # Verify create was called with the right arguments for our API version and kind + self.custom_object_client.create_namespaced_custom_object.assert_called_once_with( + group="kubevirt.io", + version="v1", + namespace="default", + plural="virtualmachineinstances", + body=vmi_dict + ) + + def test_validation_failure(self): + """ + Test validation failure when KubeVirt is not installed + """ + # Mock empty CRD list (no KubeVirt CRDs) + empty_crd_list = MagicMock() + empty_crd_list.items = [] + self.k8s_client.list_custom_resource_definition.return_value = empty_crd_list + + with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))): + result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry) + + self.assertEqual(result, 1) + + def test_delete_vmi_timeout(self): + """ + Test timeout during VMI deletion + """ + # Mock successful delete operation + self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={}) + + # Mock that get_vmi always returns VMI (never gets deleted) + with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi): + # Simulate timeout by making time.time return values that exceed the timeout + with patch('time.sleep'), patch('time.time', side_effect=[0, 10, 20, 130, 130, 130, 130, 140]): + result = self.plugin.inject("test-vm", "default", False) + + self.assertEqual(result, 1) + self.custom_object_client.delete_namespaced_custom_object.assert_called_once_with( + group="kubevirt.io", + version="v1", + namespace="default", + plural="virtualmachineinstances", + name="test-vm" + ) + + +if __name__ == "__main__": + unittest.main()