Kubevirt VM outage tests with improved mocking and validation scenarios at test_kubevirt_vm_outage.py (#1041)

* Kubevirt VM outage tests with improved mocking and validation scenarios at test_kubevirt_vm_outage.py

Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>

* Refactor Kubevirt VM outage tests to improve time mocking and response handling

Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>

* Remove unused subproject reference for pvc_outage

Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>

* Refactor Kubevirt VM outage tests to enhance time mocking and improve response handling

Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>

* Enhance VMI deletion test by mocking unchanged creationTimestamp to exercise timeout path

Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>

* Refactor Kubevirt VM outage tests to use dynamic timestamps and improve mock handling

Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>

---------

Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>
Co-authored-by: Tullio Sebastiani <tsebastiani@users.noreply.github.com>
This commit is contained in:
Sai Sanjay
2026-01-02 13:47:13 +00:00
committed by GitHub
parent ce52183a26
commit 10b6e4663e

View File

@@ -1,11 +1,17 @@
import unittest
import itertools
import time
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import yaml
from kubernetes.client.rest import ApiException
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.k8s import AffectedPod, PodsStatus
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
import tempfile
import os
from krkn.scenario_plugins.kubevirt_vm_outage.kubevirt_vm_outage_scenario_plugin import KubevirtVmOutageScenarioPlugin
@@ -17,12 +23,13 @@ class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase):
Set up test fixtures for KubevirtVmOutageScenarioPlugin
"""
self.plugin = KubevirtVmOutageScenarioPlugin()
# Create mock k8s client
self.k8s_client = MagicMock()
self.custom_object_client = MagicMock()
self.k8s_client.custom_object_client = self.custom_object_client
self.plugin.k8s_client = self.k8s_client
self.plugin.custom_object_client = self.custom_object_client
# Mock methods needed for KubeVirt operations
self.k8s_client.list_custom_resource_definition = MagicMock()
@@ -36,10 +43,24 @@ class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase):
self.k8s_client.list_custom_resource_definition.return_value = crd_list
# Mock VMI data
base_time = datetime.now(timezone.utc)
self.mock_vmi = {
"metadata": {
"name": "test-vm",
"namespace": "default"
"namespace": "default",
"creationTimestamp": base_time.isoformat() + "Z"
},
"status": {
"phase": "Running"
}
}
# Mock VMI with new creation timestamp (after recreation)
self.mock_vmi_recreated = {
"metadata": {
"name": "test-vm",
"namespace": "default",
"creationTimestamp": (base_time + timedelta(minutes=1)).isoformat() + "Z"
},
"status": {
"phase": "Running"
@@ -62,7 +83,6 @@ class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase):
}
# Create a temporary config file
import tempfile, os
temp_dir = tempfile.gettempdir()
self.scenario_file = os.path.join(temp_dir, "test_kubevirt_scenario.yaml")
with open(self.scenario_file, "w") as f:
@@ -72,38 +92,138 @@ class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase):
self.telemetry = MagicMock(spec=KrknTelemetryOpenshift)
self.scenario_telemetry = MagicMock(spec=ScenarioTelemetry)
self.telemetry.get_lib_kubernetes.return_value = self.k8s_client
def create_incrementing_time_function(self):
"""
Create an incrementing time function that returns sequential float values.
Returns a callable that can be used with patch('time.time', side_effect=...)
"""
counter = itertools.count(1)
def mock_time():
return float(next(counter))
return mock_time
def test_successful_injection_and_recovery(self):
"""
Test successful deletion and recovery of a VMI
"""
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock inject and recover to simulate success
with patch.object(self.plugin, 'inject', return_value=0) as mock_inject:
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
# Mock list_namespaces_by_regex to return a single namespace
self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default"])
# Mock list_namespaced_custom_object to return our VMI
self.custom_object_client.list_namespaced_custom_object = MagicMock(
side_effect=[
{"items": [self.mock_vmi]}, # For get_vmis
{"items": [{"metadata": {"name": "test-vm"}}]}, # For validate_environment
]
)
# Mock get_namespaced_custom_object with a sequence that handles multiple calls
# Call sequence:
# 1. validate_environment: get original VMI
# 2. execute_scenario: get VMI before deletion
# 3. delete_vmi: loop checking if timestamp changed (returns recreated VMI on first check)
# 4+. wait_for_running: loop until phase is Running (may call multiple times)
get_vmi_responses = [
self.mock_vmi, # Initial get in validate_environment
self.mock_vmi, # Get before delete
self.mock_vmi_recreated, # After delete (recreated with new timestamp)
self.mock_vmi_recreated, # Check if running
]
class GetVmiSideEffect:
"""
Callable helper that returns a predefined sequence of VMIs.
If called more times than there are responses, it fails the test
to surface unexpected additional calls instead of silently
masking them.
"""
def __init__(self, responses):
self._responses = responses
self._call_iter = itertools.count()
self.call_count = 0
def __call__(self, *args, **kwargs):
call_num = next(self._call_iter)
self.call_count = call_num + 1
if call_num >= len(self._responses):
raise AssertionError(
f"get_vmi_side_effect called more times ({call_num + 1}) "
f"than expected ({len(self._responses)})."
)
return self._responses[call_num]
get_vmi_side_effect = GetVmiSideEffect(get_vmi_responses)
self.custom_object_client.get_namespaced_custom_object = MagicMock(
side_effect=get_vmi_side_effect
)
# Mock delete operation
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
with patch('time.time', side_effect=self.create_incrementing_time_function()), patch('time.sleep'):
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
mock_inject.assert_called_once_with("test-vm", "default", False)
mock_recover.assert_called_once_with("test-vm", "default", False)
# Verify get_namespaced_custom_object was called exactly as many times as
# there are predefined responses; get_vmi_side_effect will raise if any
# additional unexpected calls are made.
self.assertEqual(
self.custom_object_client.get_namespaced_custom_object.call_count,
len(get_vmi_responses),
)
# Verify that the VMI delete operation was performed once with expected parameters.
self.custom_object_client.delete_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
name="test-vm",
)
def test_injection_failure(self):
"""
Test failure during VMI deletion
"""
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock inject to simulate failure
with patch.object(self.plugin, 'inject', return_value=1) as mock_inject:
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
# Mock list_namespaces_by_regex
self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default"])
# Mock list to return VMI
self.custom_object_client.list_namespaced_custom_object = MagicMock(
side_effect=[
{"items": [self.mock_vmi]}, # For get_vmis
{"items": [{"metadata": {"name": "test-vm"}}]}, # For validate_environment
]
)
# Mock get_vmi
self.custom_object_client.get_namespaced_custom_object = MagicMock(
side_effect=[
self.mock_vmi, # validate_environment
self.mock_vmi, # get before delete
]
)
# Mock delete to raise an error
self.custom_object_client.delete_namespaced_custom_object = MagicMock(
side_effect=ApiException(status=500, reason="Internal Server Error")
)
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 1)
mock_inject.assert_called_once_with("test-vm", "default", False)
mock_recover.assert_not_called()
# Verify delete was attempted before the error occurred
self.custom_object_client.delete_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
name="test-vm"
)
def test_disable_auto_restart(self):
"""
@@ -111,31 +231,53 @@ class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase):
"""
# Configure test with disable_auto_restart=True
self.config["scenarios"][0]["parameters"]["disable_auto_restart"] = True
# Mock list_namespaces_by_regex
self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default"])
# Mock VM object for patching
mock_vm = {
"metadata": {"name": "test-vm", "namespace": "default"},
"spec": {}
"spec": {"running": True}
}
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock VM patch operation
with patch.object(self.plugin, 'patch_vm_spec') as mock_patch_vm:
mock_patch_vm.return_value = True
# Mock inject and recover
with patch.object(self.plugin, 'inject', return_value=0) as mock_inject:
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
# Mock list to return VMI
self.custom_object_client.list_namespaced_custom_object = MagicMock(
side_effect=[
{"items": [self.mock_vmi]}, # For get_vmis
{"items": [{"metadata": {"name": "test-vm"}}]}, # For validate_environment
]
)
# Mock get_namespaced_custom_object with detailed call sequence.
# NOTE: validate_environment uses list_namespaced_custom_object earlier in the flow;
# this mock only covers subsequent get_namespaced_custom_object calls.
# It handles both VMI (virtualmachineinstances) and VM (virtualmachines) resources.
# Call sequence for get_namespaced_custom_object:
# 1. execute_scenario: get VMI before deletion (plural=virtualmachineinstances)
# 2. patch_vm_spec: get VM for patching (plural=virtualmachines)
# 3. delete_vmi: loop checking if VMI timestamp changed (plural=virtualmachineinstances)
# 4+. wait_for_running: loop until VMI phase is Running (plural=virtualmachineinstances)
self.custom_object_client.get_namespaced_custom_object = MagicMock(
side_effect=[
self.mock_vmi, # Call 1: get VMI before delete
mock_vm, # Call 2: get VM for patching (different resource type)
self.mock_vmi_recreated, # Call 3: delete_vmi detects new timestamp
self.mock_vmi_recreated, # Call 4: wait_for_running checks phase
]
)
# Mock patch and delete operations
self.custom_object_client.patch_namespaced_custom_object = MagicMock(return_value=mock_vm)
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
with patch('time.time', side_effect=self.create_incrementing_time_function()), patch('time.sleep'):
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
# Should call patch_vm_spec to disable auto-restart
mock_patch_vm.assert_any_call("test-vm", "default", False)
# Should call patch_vm_spec to re-enable auto-restart during recovery
mock_patch_vm.assert_any_call("test-vm", "default", True)
mock_inject.assert_called_once_with("test-vm", "default", True)
mock_recover.assert_called_once_with("test-vm", "default", True)
# Verify patch was called to disable auto-restart
self.custom_object_client.patch_namespaced_custom_object.assert_called()
def test_recovery_when_vmi_does_not_exist(self):
"""
@@ -143,64 +285,102 @@ class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase):
"""
# Store the original VMI in the plugin for recovery
self.plugin.original_vmi = self.mock_vmi.copy()
# Create a cleaned vmi_dict as the plugin would
vmi_dict = self.mock_vmi.copy()
# Initialize affected_pod which is used by wait_for_running
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
# Set up running VMI data for after recovery
running_vmi = {
"metadata": {"name": "test-vm", "namespace": "default"},
"metadata": {
"name": "test-vm",
"namespace": "default",
"creationTimestamp": (datetime.now(timezone.utc) + timedelta(minutes=2)).isoformat() + "Z"
},
"status": {"phase": "Running"}
}
# Set up time.time to immediately exceed the timeout for auto-recovery
with patch('time.time', side_effect=[0, 301, 301, 301, 301, 310, 320]):
# Mock get_vmi to always return None (not auto-recovered)
with patch.object(self.plugin, 'get_vmi', side_effect=[None, None, running_vmi]):
# Mock the custom object API to return success
self.custom_object_client.create_namespaced_custom_object = MagicMock(return_value=running_vmi)
# Run recovery with mocked time.sleep
with patch('time.sleep'):
result = self.plugin.recover("test-vm", "default", False)
self.assertEqual(result, 0)
# Verify create was called with the right arguments for our API version and kind
self.custom_object_client.create_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
body=vmi_dict
# Mock get_namespaced_custom_object call sequence triggered during recovery.
# Call sequence:
# 1. wait_for_running (line 391): first loop iteration - VMI creation has been
# requested by recover(), but the VMI may not yet be visible in the API.
# In this case, get_vmi/get_namespaced_custom_object can legitimately return 404.
# 2. wait_for_running: subsequent loop iterations - VMI exists and is running.
# Note: recover() itself does NOT call get_vmi before creating the VMI. It only
# checks if self.original_vmi exists, then directly calls create_namespaced_custom_object.
# All get_vmi calls (and thus the simulated 404 followed by a Running VMI) happen
# within wait_for_running *after* the create call has been issued.
self.custom_object_client.get_namespaced_custom_object = MagicMock(
side_effect=[
ApiException(status=404, reason="Not Found"), # wait_for_running: VMI not visible yet after create
running_vmi, # wait_for_running: VMI now exists and is running
]
)
# Mock the create API to return success
self.custom_object_client.create_namespaced_custom_object = MagicMock(return_value=running_vmi)
# Run recovery with mocked time
with patch('time.time', side_effect=self.create_incrementing_time_function()), patch('time.sleep'):
result = self.plugin.recover("test-vm", "default", False)
self.assertEqual(result, 0)
# Verify create was called with the right arguments
self.custom_object_client.create_namespaced_custom_object.assert_called_once()
def test_validation_failure(self):
"""
Test validation failure when KubeVirt is not installed
"""
# Mock empty CRD list (no KubeVirt CRDs)
empty_crd_list = MagicMock()
empty_crd_list.items = []
self.k8s_client.list_custom_resource_definition.return_value = empty_crd_list
# Mock list_namespaces_by_regex
self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default"])
# Mock list to return VMI for get_vmis
self.custom_object_client.list_namespaced_custom_object = MagicMock(
side_effect=[
{"items": [self.mock_vmi]}, # For get_vmis
{"items": []}, # For validate_environment - empty result simulating no KubeVirt CRDs
]
)
# Mock get_vmi to return VMI
self.custom_object_client.get_namespaced_custom_object = MagicMock(
return_value=self.mock_vmi
)
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 1)
def test_delete_vmi_timeout(self):
"""
Test timeout during VMI deletion
"""
# Store original VMI
self.plugin.original_vmi = self.mock_vmi
# Initialize required attributes
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
self.plugin.pods_status = PodsStatus()
# Mock successful delete operation
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
# Mock that get_vmi always returns VMI (never gets deleted)
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Simulate timeout by making time.time return values that exceed the timeout
with patch('time.sleep'), patch('time.time', side_effect=[0, 10, 20, 130, 130, 130, 130, 140]):
result = self.plugin.inject("test-vm", "default", False)
# Mock get_vmi to always return the same VMI with unchanged creationTimestamp.
# This simulates a scenario where the VMI has NOT been recreated after deletion
# (i.e., still has the original creationTimestamp from before deletion).
# delete_vmi (lines 315-320) loops checking if creationTimestamp changed to detect
# successful recreation. Since it never changes here, the loop continues until
# the mocked time exceeds the timeout value, exercising the timeout path.
self.custom_object_client.get_namespaced_custom_object = MagicMock(
return_value=self.mock_vmi
)
# Simulate timeout by making time.time return values that exceed the timeout
with patch('time.sleep'), patch('time.time', side_effect=[0, 10, 20, 130, 140]):
result = self.plugin.delete_vmi("test-vm", "default", False, timeout=120)
self.assertEqual(result, 1)
self.custom_object_client.delete_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",