adding virt checker tests (#960)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 3m45s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped

Signed-off-by: Paige Patton <prubenda@redhat.com>
This commit is contained in:
Paige Patton
2025-11-18 14:27:59 -05:00
committed by GitHub
parent d0c604a516
commit a373dcf453
22 changed files with 2375 additions and 1 deletions

View File

@@ -1,4 +1,4 @@
[run]
omit =
tests/*
krkn/tests/**
krkn/tests/**

View File

@@ -0,0 +1,37 @@
import tempfile
import unittest
from krkn.scenario_plugins.native.run_python_plugin import (
RunPythonFileInput,
run_python_file,
)
class RunPythonPluginTest(unittest.TestCase):
def test_success_execution(self):
tmp_file = tempfile.NamedTemporaryFile()
tmp_file.write(bytes("print('Hello world!')", "utf-8"))
tmp_file.flush()
output_id, output_data = run_python_file(
params=RunPythonFileInput(tmp_file.name),
run_id="test-python-plugin-success",
)
self.assertEqual("success", output_id)
self.assertEqual("Hello world!\n", output_data.stdout)
def test_error_execution(self):
tmp_file = tempfile.NamedTemporaryFile()
tmp_file.write(
bytes("import sys\nprint('Hello world!')\nsys.exit(42)\n", "utf-8")
)
tmp_file.flush()
output_id, output_data = run_python_file(
params=RunPythonFileInput(tmp_file.name), run_id="test-python-plugin-error"
)
self.assertEqual("error", output_id)
self.assertEqual(42, output_data.exit_code)
self.assertEqual("Hello world!\n", output_data.stdout)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for ApplicationOutageScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_application_outage_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.application_outage.application_outage_scenario_plugin import ApplicationOutageScenarioPlugin
class TestApplicationOutageScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ApplicationOutageScenarioPlugin
"""
self.plugin = ApplicationOutageScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["application_outages_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for ContainerScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_container_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.container.container_scenario_plugin import ContainerScenarioPlugin
class TestContainerScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ContainerScenarioPlugin
"""
self.plugin = ContainerScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["container_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,503 @@
#!/usr/bin/env python3
"""
Test suite for HealthChecker class
This test file provides comprehensive coverage for the main functionality of HealthChecker:
- HTTP request making with various authentication methods
- Health check monitoring with status tracking
- Failure detection and recovery tracking
- Exit on failure behavior
- Telemetry collection
Usage:
python -m coverage run -a -m unittest tests/test_health_checker.py -v
Assisted By: Claude Code
"""
import queue
import unittest
from datetime import datetime
from unittest.mock import MagicMock, patch
from krkn_lib.models.telemetry.models import HealthCheck
from krkn.utils.HealthChecker import HealthChecker
class TestHealthChecker(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for HealthChecker
"""
self.checker = HealthChecker(iterations=5)
self.health_check_queue = queue.Queue()
def tearDown(self):
"""
Clean up after each test
"""
self.checker.current_iterations = 0
self.checker.ret_value = 0
def make_increment_side_effect(self, response_data):
"""
Helper to create a side effect that increments current_iterations
"""
def side_effect(*args, **kwargs):
self.checker.current_iterations += 1
return response_data
return side_effect
@patch('requests.get')
def test_make_request_success(self, mock_get):
"""
Test make_request returns success for 200 status code
"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_get.return_value = mock_response
result = self.checker.make_request("http://example.com")
self.assertEqual(result["url"], "http://example.com")
self.assertEqual(result["status"], True)
self.assertEqual(result["status_code"], 200)
mock_get.assert_called_once_with(
"http://example.com",
auth=None,
headers=None,
verify=True,
timeout=3
)
@patch('requests.get')
def test_make_request_with_auth(self, mock_get):
"""
Test make_request with basic authentication
"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_get.return_value = mock_response
auth = ("user", "pass")
result = self.checker.make_request("http://example.com", auth=auth)
self.assertEqual(result["status"], True)
mock_get.assert_called_once_with(
"http://example.com",
auth=auth,
headers=None,
verify=True,
timeout=3
)
@patch('requests.get')
def test_make_request_with_bearer_token(self, mock_get):
"""
Test make_request with bearer token authentication
"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_get.return_value = mock_response
headers = {"Authorization": "Bearer token123"}
result = self.checker.make_request("http://example.com", headers=headers)
self.assertEqual(result["status"], True)
mock_get.assert_called_once_with(
"http://example.com",
auth=None,
headers=headers,
verify=True,
timeout=3
)
@patch('requests.get')
def test_make_request_failure(self, mock_get):
"""
Test make_request returns failure for non-200 status code
"""
mock_response = MagicMock()
mock_response.status_code = 500
mock_get.return_value = mock_response
result = self.checker.make_request("http://example.com")
self.assertEqual(result["status"], False)
self.assertEqual(result["status_code"], 500)
@patch('requests.get')
def test_make_request_with_verify_false(self, mock_get):
"""
Test make_request with SSL verification disabled
"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_get.return_value = mock_response
result = self.checker.make_request("https://example.com", verify=False)
self.assertEqual(result["status"], True)
mock_get.assert_called_once_with(
"https://example.com",
auth=None,
headers=None,
verify=False,
timeout=3
)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_empty_config(self, mock_sleep, mock_make_request):
"""
Test run_health_check with empty config skips checks
"""
config = {
"config": [],
"interval": 2
}
self.checker.run_health_check(config, self.health_check_queue)
mock_make_request.assert_not_called()
self.assertTrue(self.health_check_queue.empty())
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_successful_requests(self, mock_sleep, mock_make_request):
"""
Test run_health_check with all successful requests
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "http://example.com",
"status": True,
"status_code": 200
})
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False
}
],
"interval": 0.01
}
self.checker.iterations = 2
self.checker.run_health_check(config, self.health_check_queue)
# Should have telemetry
self.assertFalse(self.health_check_queue.empty())
telemetry = self.health_check_queue.get()
self.assertEqual(len(telemetry), 1)
self.assertEqual(telemetry[0].status, True)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_failure_then_recovery(self, mock_sleep, mock_make_request):
"""
Test run_health_check detects failure and recovery
"""
# Create side effects that increment and return different values
call_count = [0]
def side_effect(*args, **kwargs):
self.checker.current_iterations += 1
call_count[0] += 1
if call_count[0] == 1:
return {"url": "http://example.com", "status": False, "status_code": 500}
else:
return {"url": "http://example.com", "status": True, "status_code": 200}
mock_make_request.side_effect = side_effect
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False
}
],
"interval": 0.01
}
self.checker.iterations = 3
self.checker.run_health_check(config, self.health_check_queue)
# Should have telemetry showing failure period
self.assertFalse(self.health_check_queue.empty())
telemetry = self.health_check_queue.get()
# Should have at least 2 entries: one for failure period, one for success period
self.assertGreaterEqual(len(telemetry), 1)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_with_bearer_token(self, mock_sleep, mock_make_request):
"""
Test run_health_check correctly handles bearer token
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "http://example.com",
"status": True,
"status_code": 200
})
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": "test-token-123",
"auth": None,
"exit_on_failure": False
}
],
"interval": 0.01
}
self.checker.iterations = 1
self.checker.run_health_check(config, self.health_check_queue)
# Verify bearer token was added to headers
# make_request is called as: make_request(url, auth, headers, verify_url)
call_args = mock_make_request.call_args
self.assertEqual(call_args[0][2]['Authorization'], "Bearer test-token-123")
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_with_auth(self, mock_sleep, mock_make_request):
"""
Test run_health_check correctly handles basic auth
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "http://example.com",
"status": True,
"status_code": 200
})
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": "user,pass",
"exit_on_failure": False
}
],
"interval": 0.01
}
self.checker.iterations = 1
self.checker.run_health_check(config, self.health_check_queue)
# Verify auth tuple was created correctly
# make_request is called as: make_request(url, auth, headers, verify_url)
call_args = mock_make_request.call_args
self.assertEqual(call_args[0][1], ("user", "pass"))
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_exit_on_failure(self, mock_sleep, mock_make_request):
"""
Test run_health_check sets ret_value=2 when exit_on_failure is True
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "http://example.com",
"status": False,
"status_code": 500
})
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": True
}
],
"interval": 0.01
}
self.checker.iterations = 1
self.checker.run_health_check(config, self.health_check_queue)
# ret_value should be set to 2 on failure
self.assertEqual(self.checker.ret_value, 2)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_exit_on_failure_not_set_on_success(self, mock_sleep, mock_make_request):
"""
Test run_health_check does not set ret_value when request succeeds
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "http://example.com",
"status": True,
"status_code": 200
})
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": True
}
],
"interval": 0.01
}
self.checker.iterations = 1
self.checker.run_health_check(config, self.health_check_queue)
# ret_value should remain 0 on success
self.assertEqual(self.checker.ret_value, 0)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_with_verify_url_false(self, mock_sleep, mock_make_request):
"""
Test run_health_check respects verify_url setting
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "https://example.com",
"status": True,
"status_code": 200
})
config = {
"config": [
{
"url": "https://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False,
"verify_url": False
}
],
"interval": 0.01
}
self.checker.iterations = 1
self.checker.run_health_check(config, self.health_check_queue)
# Verify that verify parameter was set to False
# make_request is called as: make_request(url, auth, headers, verify_url)
call_args = mock_make_request.call_args
self.assertEqual(call_args[0][3], False)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_exception_handling(self, mock_sleep, mock_make_request):
"""
Test run_health_check handles exceptions during requests
"""
# Simulate exception during request but also increment to avoid infinite loop
def side_effect(*args, **kwargs):
self.checker.current_iterations += 1
raise Exception("Connection error")
mock_make_request.side_effect = side_effect
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False
}
],
"interval": 0.01
}
self.checker.iterations = 1
# Should not raise exception
self.checker.run_health_check(config, self.health_check_queue)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_multiple_urls(self, mock_sleep, mock_make_request):
"""
Test run_health_check with multiple URLs
"""
call_count = [0]
def side_effect(*args, **kwargs):
call_count[0] += 1
# Increment only after both URLs are called (one iteration)
if call_count[0] % 2 == 0:
self.checker.current_iterations += 1
return {
"status": True,
"status_code": 200
}
mock_make_request.side_effect = side_effect
config = {
"config": [
{
"url": "http://example1.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False
},
{
"url": "http://example2.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False
}
],
"interval": 0.01
}
self.checker.iterations = 1
self.checker.run_health_check(config, self.health_check_queue)
# Should have called make_request for both URLs
self.assertEqual(mock_make_request.call_count, 2)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_custom_interval(self, mock_sleep, mock_make_request):
"""
Test run_health_check uses custom interval
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "http://example.com",
"status": True,
"status_code": 200
})
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False
}
],
"interval": 5
}
self.checker.iterations = 2
self.checker.run_health_check(config, self.health_check_queue)
# Verify sleep was called with custom interval
mock_sleep.assert_called_with(5)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for HogsScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_hogs_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.hogs.hogs_scenario_plugin import HogsScenarioPlugin
class TestHogsScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for HogsScenarioPlugin
"""
self.plugin = HogsScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["hog_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,696 @@
#!/usr/bin/env python3
"""
Test suite for KubeVirt VM Outage Scenario Plugin class
Note: This test file uses mocks extensively to avoid needing actual Kubernetes/KubeVirt infrastructure.
Usage:
python -m coverage run -a -m unittest tests/test_kubevirt_vm_outage.py -v
Assisted By: Claude Code
"""
import copy
import itertools
import os
import tempfile
import unittest
from unittest.mock import MagicMock, patch
import yaml
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.k8s import AffectedPod, PodsStatus
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from kubernetes.client.rest import ApiException
from krkn.scenario_plugins.kubevirt_vm_outage.kubevirt_vm_outage_scenario_plugin import KubevirtVmOutageScenarioPlugin
class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for KubevirtVmOutageScenarioPlugin
"""
self.plugin = KubevirtVmOutageScenarioPlugin()
# Create mock k8s client
self.k8s_client = MagicMock()
self.custom_object_client = MagicMock()
self.k8s_client.custom_object_client = self.custom_object_client
self.plugin.k8s_client = self.k8s_client
self.plugin.custom_object_client = self.custom_object_client
# Mock methods needed for KubeVirt operations
self.k8s_client.list_custom_resource_definition = MagicMock()
# Mock custom resource definition list with KubeVirt CRDs
crd_list = MagicMock()
crd_item = MagicMock()
crd_item.spec = MagicMock()
crd_item.spec.group = "kubevirt.io"
crd_list.items = [crd_item]
self.k8s_client.list_custom_resource_definition.return_value = crd_list
# Mock VMI data
self.mock_vmi = {
"metadata": {
"name": "test-vm",
"namespace": "default"
},
"status": {
"phase": "Running"
}
}
# Create test config
self.config = {
"scenarios": [
{
"name": "kubevirt outage test",
"scenario": "kubevirt_vm_outage",
"parameters": {
"vm_name": "test-vm",
"namespace": "default",
"duration": 0
}
}
]
}
# Create a temporary config file
temp_dir = tempfile.gettempdir()
self.scenario_file = os.path.join(temp_dir, "test_kubevirt_scenario.yaml")
with open(self.scenario_file, "w") as f:
yaml.dump(self.config, f)
# Mock dependencies
self.telemetry = MagicMock(spec=KrknTelemetryOpenshift)
self.scenario_telemetry = MagicMock(spec=ScenarioTelemetry)
self.telemetry.get_lib_kubernetes.return_value = self.k8s_client
# Initialize counters for reusable mock functions
self.delete_count = 0
self.wait_count = 0
def mock_delete(self, *args, **kwargs):
self.delete_count += 1
self.plugin.affected_pod = AffectedPod(pod_name=f"test-vm-{self.delete_count}", namespace="default")
self.plugin.affected_pod.pod_rescheduling_time = 5.0
return 0
def mock_wait(self, *args, **kwargs):
self.wait_count += 1
self.plugin.affected_pod.pod_readiness_time = 3.0
return 0
def test_successful_injection_and_recovery(self):
"""
Test successful deletion and recovery of a VMI
"""
# Populate vmis_list to avoid randrange error
self.plugin.vmis_list = [self.mock_vmi]
# Mock get_vmis to not clear the list
with patch.object(self.plugin, 'get_vmis'):
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock validate_environment to return True
with patch.object(self.plugin, 'validate_environment', return_value=True):
# Mock delete_vmi and wait_for_running to simulate success
with patch.object(self.plugin, 'delete_vmi', side_effect=self.mock_delete) as mock_delete:
with patch.object(self.plugin, 'wait_for_running', side_effect=self.mock_wait) as mock_wait:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
mock_delete.assert_called_once_with("test-vm", "default", False)
mock_wait.assert_called_once_with("test-vm", "default", 60)
def test_injection_failure(self):
"""
Test failure during VMI deletion
"""
# Populate vmis_list to avoid randrange error
self.plugin.vmis_list = [self.mock_vmi]
# Mock get_vmis to not clear the list
with patch.object(self.plugin, 'get_vmis'):
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock validate_environment to return True
with patch.object(self.plugin, 'validate_environment', return_value=True):
# Mock delete_vmi to simulate failure
with patch.object(self.plugin, 'delete_vmi', return_value=1) as mock_delete:
with patch.object(self.plugin, 'wait_for_running', return_value=0) as mock_wait:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
mock_delete.assert_called_once_with("test-vm", "default", False)
mock_wait.assert_not_called()
def test_disable_auto_restart(self):
"""
Test VM auto-restart can be disabled
"""
# Configure test with disable_auto_restart=True
self.config["scenarios"][0]["parameters"]["disable_auto_restart"] = True
# Populate vmis_list to avoid randrange error
self.plugin.vmis_list = [self.mock_vmi]
# Mock get_vmis to not clear the list
with patch.object(self.plugin, 'get_vmis'):
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock validate_environment to return True
with patch.object(self.plugin, 'validate_environment', return_value=True):
# Mock delete_vmi and wait_for_running
with patch.object(self.plugin, 'delete_vmi', side_effect=self.mock_delete) as mock_delete:
with patch.object(self.plugin, 'wait_for_running', side_effect=self.mock_wait) as mock_wait:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
# delete_vmi should be called with disable_auto_restart=True
mock_delete.assert_called_once_with("test-vm", "default", True)
mock_wait.assert_called_once_with("test-vm", "default", 60)
def test_recovery_when_vmi_does_not_exist(self):
"""
Test recovery logic when VMI does not exist after deletion
"""
# Initialize the plugin's custom_object_client
self.plugin.custom_object_client = self.custom_object_client
# Store the original VMI in the plugin for recovery
self.plugin.original_vmi = self.mock_vmi.copy()
# Create a cleaned vmi_dict as the plugin would
vmi_dict = self.mock_vmi.copy()
# Set up running VMI data for after recovery
running_vmi = {
"metadata": {"name": "test-vm", "namespace": "default"},
"status": {"phase": "Running"}
}
# Set up time.time to immediately exceed the timeout for auto-recovery
with patch('time.time', side_effect=[0, 301, 301, 301, 301, 310, 320]):
# Mock get_vmi to always return None (not auto-recovered)
with patch.object(self.plugin, 'get_vmi', side_effect=[None, None, running_vmi]):
# Mock the custom object API to return success
self.custom_object_client.create_namespaced_custom_object = MagicMock(return_value=running_vmi)
# Run recovery with mocked time.sleep
with patch('time.sleep'):
result = self.plugin.recover("test-vm", "default", False)
self.assertEqual(result, 0)
# Verify create was called with the right arguments for our API version and kind
self.custom_object_client.create_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
body=vmi_dict
)
def test_validation_failure(self):
"""
Test validation failure when KubeVirt is not installed
"""
# Populate vmis_list to avoid randrange error
self.plugin.vmis_list = [self.mock_vmi]
# Mock get_vmis to not clear the list
with patch.object(self.plugin, 'get_vmis'):
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock validate_environment to return False (KubeVirt not installed)
with patch.object(self.plugin, 'validate_environment', return_value=False):
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
# When validation fails, run() returns 1 due to exception handling
self.assertEqual(result, 1)
def test_delete_vmi_timeout(self):
"""
Test timeout during VMI deletion
"""
# Initialize the plugin's custom_object_client and required attributes
self.plugin.custom_object_client = self.custom_object_client
# Initialize original_vmi which is required by delete_vmi
self.plugin.original_vmi = self.mock_vmi.copy()
self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z'
# Initialize pods_status which delete_vmi needs
from krkn_lib.models.k8s import PodsStatus, AffectedPod
self.plugin.pods_status = PodsStatus()
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
# Mock successful delete operation
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
# Mock that get_vmi always returns VMI with same creationTimestamp (never gets recreated)
mock_vmi_with_time = self.mock_vmi.copy()
mock_vmi_with_time['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z'
with patch.object(self.plugin, 'get_vmi', return_value=mock_vmi_with_time):
# Simulate timeout by making time.time return values that exceed the timeout
with patch('time.sleep'), patch('time.time', side_effect=[0, 10, 20, 130, 130, 130, 130, 140]):
result = self.plugin.delete_vmi("test-vm", "default", False)
self.assertEqual(result, 1)
self.custom_object_client.delete_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
name="test-vm"
)
def test_get_vmi_api_exception_non_404(self):
"""
Test get_vmi raises ApiException for non-404 errors
"""
# Mock API exception with non-404 status
api_error = ApiException(status=500, reason="Internal Server Error")
self.custom_object_client.get_namespaced_custom_object = MagicMock(side_effect=api_error)
with self.assertRaises(ApiException):
self.plugin.get_vmi("test-vm", "default")
def test_get_vmi_general_exception(self):
"""
Test get_vmi raises general exceptions
"""
# Mock general exception
self.custom_object_client.get_namespaced_custom_object = MagicMock(
side_effect=Exception("Connection error")
)
with self.assertRaises(Exception):
self.plugin.get_vmi("test-vm", "default")
def test_get_vmis_with_regex_matching(self):
"""
Test get_vmis successfully filters VMIs by regex pattern
"""
# Mock namespace list
self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default", "test-ns"])
# Mock VMI list with multiple VMIs
vmi_list = {
"items": [
{"metadata": {"name": "test-vm-1"}, "status": {"phase": "Running"}},
{"metadata": {"name": "test-vm-2"}, "status": {"phase": "Running"}},
{"metadata": {"name": "other-vm"}, "status": {"phase": "Running"}},
]
}
self.custom_object_client.list_namespaced_custom_object = MagicMock(return_value=vmi_list)
# Test with regex pattern that matches test-vm-*
self.plugin.get_vmis("test-vm-.*", "default")
# Should have 4 VMs (2 per namespace * 2 namespaces)
self.assertEqual(len(self.plugin.vmis_list), 4)
# Verify only test-vm-* were added
for vmi in self.plugin.vmis_list:
self.assertTrue(vmi["metadata"]["name"].startswith("test-vm-"))
def test_get_vmis_api_exception_404(self):
"""
Test get_vmis handles 404 ApiException gracefully
"""
self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default"])
api_error = ApiException(status=404, reason="Not Found")
self.custom_object_client.list_namespaced_custom_object = MagicMock(side_effect=api_error)
# Should not raise, returns empty list
result = self.plugin.get_vmis("test-vm", "default")
self.assertEqual(result, [])
def test_get_vmis_api_exception_non_404(self):
"""
Test get_vmis raises ApiException for non-404 errors
"""
self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default"])
api_error = ApiException(status=500, reason="Internal Server Error")
self.custom_object_client.list_namespaced_custom_object = MagicMock(side_effect=api_error)
with self.assertRaises(ApiException):
self.plugin.get_vmis("test-vm", "default")
def test_patch_vm_spec_success(self):
"""
Test patch_vm_spec successfully patches VM
"""
mock_vm = {
"metadata": {"name": "test-vm", "namespace": "default"},
"spec": {"running": True}
}
self.custom_object_client.get_namespaced_custom_object = MagicMock(return_value=mock_vm)
self.custom_object_client.patch_namespaced_custom_object = MagicMock(return_value=mock_vm)
result = self.plugin.patch_vm_spec("test-vm", "default", False)
self.assertTrue(result)
self.custom_object_client.patch_namespaced_custom_object.assert_called_once()
def test_patch_vm_spec_api_exception(self):
"""
Test patch_vm_spec handles ApiException
"""
api_error = ApiException(status=404, reason="Not Found")
self.custom_object_client.get_namespaced_custom_object = MagicMock(side_effect=api_error)
result = self.plugin.patch_vm_spec("test-vm", "default", False)
self.assertFalse(result)
def test_patch_vm_spec_general_exception(self):
"""
Test patch_vm_spec handles general exceptions
"""
self.custom_object_client.get_namespaced_custom_object = MagicMock(
side_effect=Exception("Connection error")
)
result = self.plugin.patch_vm_spec("test-vm", "default", False)
self.assertFalse(result)
def test_delete_vmi_api_exception_404(self):
"""
Test delete_vmi handles 404 ApiException during deletion
"""
# Initialize required attributes
self.plugin.original_vmi = self.mock_vmi.copy()
self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z'
self.plugin.pods_status = PodsStatus()
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
api_error = ApiException(status=404, reason="Not Found")
self.custom_object_client.delete_namespaced_custom_object = MagicMock(side_effect=api_error)
result = self.plugin.delete_vmi("test-vm", "default", False)
self.assertEqual(result, 1)
def test_delete_vmi_api_exception_non_404(self):
"""
Test delete_vmi handles non-404 ApiException during deletion
"""
# Initialize required attributes
self.plugin.original_vmi = self.mock_vmi.copy()
self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z'
self.plugin.pods_status = PodsStatus()
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
api_error = ApiException(status=500, reason="Internal Server Error")
self.custom_object_client.delete_namespaced_custom_object = MagicMock(side_effect=api_error)
result = self.plugin.delete_vmi("test-vm", "default", False)
self.assertEqual(result, 1)
def test_delete_vmi_successful_recreation(self):
"""
Test delete_vmi succeeds when VMI is recreated with new creationTimestamp
"""
# Initialize required attributes - use deepcopy to avoid shared references
self.plugin.original_vmi = copy.deepcopy(self.mock_vmi)
self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z'
self.plugin.pods_status = PodsStatus()
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
# Mock get_vmi to return VMI with new creationTimestamp - use deepcopy
new_vmi = copy.deepcopy(self.mock_vmi)
new_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:05:00Z'
# Use itertools to create an infinite iterator for time values
time_iter = itertools.count(0, 0.001)
with patch.object(self.plugin, 'get_vmi', return_value=new_vmi):
with patch('time.sleep'):
with patch('time.time', side_effect=lambda: next(time_iter)):
result = self.plugin.delete_vmi("test-vm", "default", False)
self.assertEqual(result, 0)
self.assertIsNotNone(self.plugin.affected_pod.pod_rescheduling_time)
def test_delete_vmi_with_disable_auto_restart_failure(self):
"""
Test delete_vmi continues when patch_vm_spec fails and VMI stays deleted
"""
# Initialize required attributes
self.plugin.original_vmi = self.mock_vmi.copy()
self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z'
self.plugin.pods_status = PodsStatus()
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
# Mock patch_vm_spec to fail
with patch.object(self.plugin, 'patch_vm_spec', return_value=False):
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
# Mock VMI deleted (returns None) - it will timeout waiting for recreation
with patch.object(self.plugin, 'get_vmi', return_value=None):
with patch('time.sleep'):
# Use itertools to create infinite time sequence
# Use 1.0 increment to quickly reach timeout (120 seconds)
time_iter = itertools.count(0, 1.0)
with patch('time.time', side_effect=lambda: next(time_iter)):
result = self.plugin.delete_vmi("test-vm", "default", True)
# When VMI stays deleted (None), delete_vmi waits for recreation and times out
self.assertEqual(result, 1)
def test_wait_for_running_timeout(self):
"""
Test wait_for_running times out when VMI doesn't reach Running state
"""
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
# Mock VMI in Pending state
pending_vmi = self.mock_vmi.copy()
pending_vmi['status']['phase'] = 'Pending'
with patch.object(self.plugin, 'get_vmi', return_value=pending_vmi):
with patch('time.sleep'):
with patch('time.time', side_effect=[0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 121]):
result = self.plugin.wait_for_running("test-vm", "default", 120)
self.assertEqual(result, 1)
def test_wait_for_running_vmi_not_exists(self):
"""
Test wait_for_running when VMI doesn't exist yet
"""
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
# First return None (not exists), then return running VMI
running_vmi = self.mock_vmi.copy()
running_vmi['status']['phase'] = 'Running'
with patch.object(self.plugin, 'get_vmi', side_effect=[None, None, running_vmi]):
with patch('time.sleep'):
# time.time() called: start_time (0), while loop iteration 1 (1), iteration 2 (2), iteration 3 (3), end_time (3)
with patch('time.time', side_effect=[0, 1, 2, 3, 3]):
result = self.plugin.wait_for_running("test-vm", "default", 120)
self.assertEqual(result, 0)
self.assertIsNotNone(self.plugin.affected_pod.pod_readiness_time)
def test_recover_no_original_vmi(self):
"""
Test recover fails when no original VMI is captured
"""
self.plugin.original_vmi = None
result = self.plugin.recover("test-vm", "default", False)
self.assertEqual(result, 1)
def test_recover_exception_during_creation(self):
"""
Test recover handles exception during VMI creation
"""
self.plugin.original_vmi = self.mock_vmi.copy()
self.custom_object_client.create_namespaced_custom_object = MagicMock(
side_effect=Exception("Creation failed")
)
with patch.object(self.plugin, 'get_vmi', return_value=None):
with patch('time.sleep'):
with patch('time.time', side_effect=[0, 301]):
result = self.plugin.recover("test-vm", "default", False)
self.assertEqual(result, 1)
def test_execute_scenario_missing_vm_name(self):
"""
Test execute_scenario fails when vm_name is missing
"""
config = {
"parameters": {
"namespace": "default"
}
}
result = self.plugin.execute_scenario(config, self.scenario_telemetry)
self.assertEqual(result, 1)
def test_execute_scenario_vmi_not_found(self):
"""
Test execute_scenario when VMI is not found after get_vmi
"""
self.plugin.vmis_list = [self.mock_vmi]
config = {
"parameters": {
"vm_name": "test-vm",
"namespace": "default"
}
}
with patch.object(self.plugin, 'get_vmis'):
with patch.object(self.plugin, 'validate_environment', return_value=True):
# First get_vmi returns VMI, second returns None
with patch.object(self.plugin, 'get_vmi', side_effect=[self.mock_vmi, None]):
result = self.plugin.execute_scenario(config, self.scenario_telemetry)
# Should be PodsStatus with unrecovered pod
self.assertIsInstance(result, type(self.plugin.pods_status))
def test_execute_scenario_with_kill_count(self):
"""
Test execute_scenario with kill_count > 1
"""
# Create multiple VMIs
vmi_1 = self.mock_vmi.copy()
vmi_1["metadata"]["name"] = "test-vm-1"
vmi_2 = self.mock_vmi.copy()
vmi_2["metadata"]["name"] = "test-vm-2"
self.plugin.vmis_list = [vmi_1, vmi_2]
config = {
"parameters": {
"vm_name": "test-vm",
"namespace": "default",
"kill_count": 2
}
}
# Reset counters
self.delete_count = 0
self.wait_count = 0
with patch.object(self.plugin, 'get_vmis'):
with patch.object(self.plugin, 'validate_environment', return_value=True):
with patch.object(self.plugin, 'get_vmi', side_effect=[vmi_1, vmi_2]):
with patch.object(self.plugin, 'delete_vmi', side_effect=self.mock_delete) as mock_del:
with patch.object(self.plugin, 'wait_for_running', side_effect=self.mock_wait) as mock_wt:
result = self.plugin.execute_scenario(config, self.scenario_telemetry)
# Should call delete_vmi and wait_for_running twice
self.assertEqual(mock_del.call_count, 2)
self.assertEqual(mock_wt.call_count, 2)
def test_execute_scenario_wait_for_running_failure(self):
"""
Test execute_scenario when wait_for_running fails
"""
self.plugin.vmis_list = [self.mock_vmi]
config = {
"parameters": {
"vm_name": "test-vm",
"namespace": "default"
}
}
def mock_delete(*args, **kwargs):
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
self.plugin.affected_pod.pod_rescheduling_time = 5.0
return 0
with patch.object(self.plugin, 'get_vmis'):
with patch.object(self.plugin, 'validate_environment', return_value=True):
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
with patch.object(self.plugin, 'delete_vmi', side_effect=mock_delete):
with patch.object(self.plugin, 'wait_for_running', return_value=1):
result = self.plugin.execute_scenario(config, self.scenario_telemetry)
# Should have unrecovered pod
self.assertEqual(len(result.unrecovered), 1)
def test_validate_environment_exception(self):
"""
Test validate_environment handles exceptions
"""
self.custom_object_client.list_namespaced_custom_object = MagicMock(
side_effect=Exception("Connection error")
)
result = self.plugin.validate_environment("test-vm", "default")
self.assertFalse(result)
def test_validate_environment_vmi_not_found(self):
"""
Test validate_environment when VMI doesn't exist
"""
# Mock CRDs exist
mock_crd_list = MagicMock()
mock_crd_list.items = MagicMock(return_value=["item1"])
self.custom_object_client.list_namespaced_custom_object = MagicMock(return_value=mock_crd_list)
# Mock VMI not found
with patch.object(self.plugin, 'get_vmi', return_value=None):
result = self.plugin.validate_environment("test-vm", "default")
self.assertFalse(result)
def test_init_clients(self):
"""
Test init_clients initializes k8s client correctly
"""
mock_k8s = MagicMock(spec=KrknKubernetes)
mock_custom_client = MagicMock()
mock_k8s.custom_object_client = mock_custom_client
self.plugin.init_clients(mock_k8s)
self.assertEqual(self.plugin.k8s_client, mock_k8s)
self.assertEqual(self.plugin.custom_object_client, mock_custom_client)
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["kubevirt_vm_outage"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for ManagedClusterScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_managed_cluster_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.managed_cluster.managed_cluster_scenario_plugin import ManagedClusterScenarioPlugin
class TestManagedClusterScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ManagedClusterScenarioPlugin
"""
self.plugin = ManagedClusterScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["managedcluster_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for NativeScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_native_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.native.native_scenario_plugin import NativeScenarioPlugin
class TestNativeScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for NativeScenarioPlugin
"""
self.plugin = NativeScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario types
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["pod_network_scenarios", "ingress_node_scenarios"])
self.assertEqual(len(result), 2)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for NetworkChaosNgScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_network_chaos_ng_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.network_chaos_ng.network_chaos_ng_scenario_plugin import NetworkChaosNgScenarioPlugin
class TestNetworkChaosNgScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for NetworkChaosNgScenarioPlugin
"""
self.plugin = NetworkChaosNgScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["network_chaos_ng_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for NetworkChaosScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_network_chaos_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.network_chaos.network_chaos_scenario_plugin import NetworkChaosScenarioPlugin
class TestNetworkChaosScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for NetworkChaosScenarioPlugin
"""
self.plugin = NetworkChaosScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["network_chaos_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for NodeActionsScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_node_actions_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.node_actions.node_actions_scenario_plugin import NodeActionsScenarioPlugin
class TestNodeActionsScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for NodeActionsScenarioPlugin
"""
self.plugin = NodeActionsScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["node_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for PodDisruptionScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_pod_disruption_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.pod_disruption.pod_disruption_scenario_plugin import PodDisruptionScenarioPlugin
class TestPodDisruptionScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for PodDisruptionScenarioPlugin
"""
self.plugin = PodDisruptionScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["pod_disruption_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for PvcScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_pvc_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.pvc.pvc_scenario_plugin import PvcScenarioPlugin
class TestPvcScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for PvcScenarioPlugin
"""
self.plugin = PvcScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["pvc_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,39 @@
#!/usr/bin/env python3
"""
Test suite for ServiceDisruptionScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_service_disruption_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.service_disruption.service_disruption_scenario_plugin import ServiceDisruptionScenarioPlugin
class TestServiceDisruptionScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ServiceDisruptionScenarioPlugin
"""
self.plugin = ServiceDisruptionScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["service_disruption_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python3
"""
Test suite for ServiceHijackingScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_service_hijacking_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from krkn.scenario_plugins.service_hijacking.service_hijacking_scenario_plugin import ServiceHijackingScenarioPlugin
class TestServiceHijackingScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ServiceHijackingScenarioPlugin
"""
self.plugin = ServiceHijackingScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["service_hijacking_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python3
"""
Test suite for ShutDownScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_shut_down_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from krkn.scenario_plugins.shut_down.shut_down_scenario_plugin import ShutDownScenarioPlugin
class TestShutDownScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ShutDownScenarioPlugin
"""
self.plugin = ShutDownScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["cluster_shut_down_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python3
"""
Test suite for SynFloodScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_syn_flood_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from krkn.scenario_plugins.syn_flood.syn_flood_scenario_plugin import SynFloodScenarioPlugin
class TestSynFloodScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for SynFloodScenarioPlugin
"""
self.plugin = SynFloodScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["syn_flood_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for TimeActionsScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_time_actions_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.time_actions.time_actions_scenario_plugin import TimeActionsScenarioPlugin
class TestTimeActionsScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for TimeActionsScenarioPlugin
"""
self.plugin = TimeActionsScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["time_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

511
tests/test_virt_checker.py Normal file
View File

@@ -0,0 +1,511 @@
#!/usr/bin/env python3
"""
Test suite for VirtChecker class
This test file provides comprehensive coverage for the main functionality of VirtChecker:
- Initialization with various configurations
- VM access checking (both virtctl and disconnected modes)
- Disconnected mode with IP/node changes
- Thread management
- Post-check validation
Usage:
python -m coverage run -a -m unittest tests/test_virt_checker.py -v
Note: This test file uses mocks extensively to avoid needing actual Kubernetes/KubeVirt infrastructure.
Created By: Claude Code
"""
import unittest
from unittest.mock import MagicMock, patch
import sys
from krkn.utils.VirtChecker import VirtChecker
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
# Create a mock VirtCheck class before any imports
class MockVirtCheck:
"""Mock VirtCheck class for testing"""
def __init__(self, data):
self.vm_name = data.get('vm_name', '')
self.ip_address = data.get('ip_address', '')
self.namespace = data.get('namespace', '')
self.node_name = data.get('node_name', '')
self.new_ip_address = data.get('new_ip_address', '')
self.status = data.get('status', False)
self.start_timestamp = data.get('start_timestamp', '')
self.end_timestamp = data.get('end_timestamp', '')
self.duration = data.get('duration', 0)
class TestVirtChecker(unittest.TestCase):
"""Test suite for VirtChecker class"""
def setUp(self):
"""Set up test fixtures before each test method"""
self.mock_krkn_lib = MagicMock()
# Mock VMI data
self.mock_vmi_1 = {
"metadata": {"name": "test-vm-1", "namespace": "test-namespace"},
"status": {
"nodeName": "worker-1",
"interfaces": [{"ipAddress": "192.168.1.10"}]
}
}
self.mock_vmi_2 = {
"metadata": {"name": "test-vm-2", "namespace": "test-namespace"},
"status": {
"nodeName": "worker-2",
"interfaces": [{"ipAddress": "192.168.1.11"}]
}
}
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_init_with_empty_namespace(self, mock_plugin_class, mock_yaml):
"""Test VirtChecker initialization with empty namespace (should skip checks)"""
def yaml_getter(config, key, default):
if key == "namespace":
return ""
return default
mock_yaml.side_effect = yaml_getter
checker = VirtChecker(
{"namespace": ""},
iterations=5,
krkn_lib=self.mock_krkn_lib
)
# Should set batch_size to 0 and not initialize plugin
self.assertEqual(checker.batch_size, 0)
mock_plugin_class.assert_not_called()
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_regex_namespace(self, mock_plugin_class, mock_yaml):
"""Test VirtChecker initialization with regex namespace pattern"""
# Setup mock plugin with VMI data
mock_plugin = MagicMock()
mock_plugin.vmis_list = [self.mock_vmi_1, self.mock_vmi_2]
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
return config.get(key, default)
mock_yaml.side_effect = yaml_getter
checker = VirtChecker(
{"namespace": "test-*"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
self.assertGreater(len(checker.vm_list), 0)
self.assertEqual(len(checker.vm_list), 2)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_with_node_name(self, mock_plugin_class, mock_yaml):
"""Test VirtChecker initialization with specific VM names"""
# Setup mock plugin with VMI data
mock_plugin = MagicMock()
mock_plugin.vmis_list = [self.mock_vmi_1, self.mock_vmi_2]
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
return config.get(key, default)
mock_yaml.side_effect = yaml_getter
# Test with VM name pattern
checker = VirtChecker(
{"namespace": "test-namespace", "name": "test-vm-.*"},
iterations=5,
krkn_lib=self.mock_krkn_lib
)
self.assertGreater(checker.batch_size, 0)
self.assertEqual(len(checker.vm_list), 2)
# Test with specific VM name
mock_plugin = MagicMock()
mock_plugin.vmis_list = [self.mock_vmi_2]
mock_plugin_class.return_value = mock_plugin
checker2 = VirtChecker(
{"namespace": "test-namespace", "name": "test-vm-1"},
iterations=5,
krkn_lib=self.mock_krkn_lib
)
self.assertGreater(checker2.batch_size, 0)
self.assertEqual(len(checker2.vm_list), 1)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_with_regex_name(self, mock_plugin_class, mock_yaml):
"""Test VirtChecker initialization filtering by node names"""
# Setup mock plugin with VMI data
mock_plugin = MagicMock()
mock_plugin.vmis_list = [self.mock_vmi_1, self.mock_vmi_2]
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
return config.get(key, default)
mock_yaml.side_effect = yaml_getter
# Test filtering by node name - should only include VMs on worker-2
checker = VirtChecker(
{"namespace": "test-namespace", "node_names": "worker-2"},
iterations=5,
krkn_lib=self.mock_krkn_lib
)
self.assertGreater(checker.batch_size, 0)
# Only test-vm-2 is on worker-2, so vm_list should have 1 VM
self.assertEqual(len(checker.vm_list), 1)
self.assertEqual(checker.vm_list[0].vm_name, "test-vm-2")
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.invoke_no_exit')
def test_get_vm_access_success(self, mock_invoke, mock_plugin_class, mock_yaml):
"""Test get_vm_access returns True when VM is accessible"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
# Mock successful access
mock_invoke.return_value = "True"
checker = VirtChecker(
{"namespace": "test-ns"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
result = checker.get_vm_access("test-vm", "test-namespace")
self.assertTrue(result)
# Should try first command and succeed
self.assertGreaterEqual(mock_invoke.call_count, 1)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.invoke_no_exit')
def test_get_vm_access_failure(self, mock_invoke, mock_plugin_class, mock_yaml):
"""Test get_vm_access returns False when VM is not accessible"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
# Mock failed access
mock_invoke.return_value = "False"
checker = VirtChecker(
{"namespace": "test-ns"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
result = checker.get_vm_access("test-vm", "test-namespace")
self.assertFalse(result)
# Should try both commands
self.assertEqual(mock_invoke.call_count, 2)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.invoke_no_exit')
def test_check_disconnected_access_success(self, mock_invoke, mock_plugin_class, mock_yaml):
"""Test check_disconnected_access with successful connection"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
# Mock successful disconnected access
mock_invoke.side_effect = ["some output", "True"]
checker = VirtChecker(
{"namespace": "test-ns"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
result, new_ip, new_node = checker.check_disconnected_access(
"192.168.1.10",
"worker-1",
"test-vm"
)
self.assertTrue(result)
self.assertIsNone(new_ip)
self.assertIsNone(new_node)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.invoke_no_exit')
def test_check_disconnected_access_with_new_ip(self, mock_invoke, mock_plugin_class, mock_yaml):
"""Test check_disconnected_access when VM has new IP address"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
# Mock failed first attempt, successful second with new IP
mock_invoke.side_effect = ["some output", "False", "True"]
mock_vmi = {
"status": {
"nodeName": "worker-1",
"interfaces": [{"ipAddress": "192.168.1.20"}]
}
}
mock_plugin.get_vmi = MagicMock(return_value=mock_vmi)
checker = VirtChecker(
{"namespace": "test-ns"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
checker.kube_vm_plugin = mock_plugin
result, new_ip, new_node = checker.check_disconnected_access(
"192.168.1.10",
"worker-1",
"test-vm"
)
self.assertTrue(result)
self.assertEqual(new_ip, "192.168.1.20")
self.assertIsNone(new_node)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.invoke_no_exit')
def test_check_disconnected_access_with_new_node(self, mock_invoke, mock_plugin_class, mock_yaml):
"""Test check_disconnected_access when VM moved to new node"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
# Mock failed attempts, successful on new node
# Call sequence: debug_check, initial_check, check_on_new_node
mock_invoke.side_effect = ["some output", "False", "True"]
mock_vmi = {
"status": {
"nodeName": "worker-2",
"interfaces": [{"ipAddress": "192.168.1.10"}]
}
}
mock_plugin.get_vmi = MagicMock(return_value=mock_vmi)
checker = VirtChecker(
{"namespace": "test-ns"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
checker.kube_vm_plugin = mock_plugin
result, new_ip, new_node = checker.check_disconnected_access(
"192.168.1.10",
"worker-1",
"test-vm"
)
self.assertTrue(result)
self.assertEqual(new_ip, "192.168.1.10")
self.assertEqual(new_node, "worker-2")
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.invoke_no_exit')
def test_check_disconnected_access_with_ssh_node_fallback(self, mock_invoke, mock_plugin_class, mock_yaml):
"""Test check_disconnected_access falls back to ssh_node"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
elif key == "ssh_node":
return "worker-0"
return default
mock_yaml.side_effect = yaml_getter
# Mock failed attempts on original node, successful on ssh_node fallback
# Call sequence: debug_check, initial_check_on_worker-1, fallback_check_on_ssh_node
# Since IP and node haven't changed, it goes directly to ssh_node fallback
mock_invoke.side_effect = ["some output", "False", "True"]
mock_vmi = {
"status": {
"nodeName": "worker-1",
"interfaces": [{"ipAddress": "192.168.1.10"}]
}
}
mock_plugin.get_vmi = MagicMock(return_value=mock_vmi)
checker = VirtChecker(
{"namespace": "test-ns", "ssh_node": "worker-0"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
checker.kube_vm_plugin = mock_plugin
result, new_ip, new_node = checker.check_disconnected_access(
"192.168.1.10",
"worker-1",
"test-vm"
)
self.assertTrue(result)
self.assertEqual(new_ip, "192.168.1.10")
self.assertIsNone(new_node)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_thread_join(self, mock_plugin_class, mock_yaml):
"""Test thread_join waits for all threads"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
checker = VirtChecker(
{"namespace": "test-ns"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
# Create mock threads
mock_thread_1 = MagicMock()
mock_thread_2 = MagicMock()
checker.threads = [mock_thread_1, mock_thread_2]
checker.thread_join()
mock_thread_1.join.assert_called_once()
mock_thread_2.join.assert_called_once()
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_init_exception_handling(self, mock_plugin_class, mock_yaml):
"""Test VirtChecker handles exceptions during initialization"""
mock_plugin = MagicMock()
mock_plugin.init_clients.side_effect = Exception("Connection error")
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
config = {"namespace": "test-ns"}
# Should not raise exception
checker = VirtChecker(
config,
iterations=1,
krkn_lib=self.mock_krkn_lib
)
# VM list should be empty due to exception
self.assertEqual(len(checker.vm_list), 0)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_batch_size_calculation(self, mock_plugin_class, mock_yaml):
"""Test batch size calculation based on VM count and thread limit"""
mock_plugin = MagicMock()
# Create 25 mock VMIs
mock_vmis = []
for i in range(25):
vmi = {
"metadata": {"name": f"vm-{i}", "namespace": "test-ns"},
"status": {
"nodeName": "worker-1",
"interfaces": [{"ipAddress": f"192.168.1.{i}"}]
}
}
mock_vmis.append(vmi)
mock_plugin.vmis_list = mock_vmis
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
elif key == "node_names":
return ""
return default
mock_yaml.side_effect = yaml_getter
config = {"namespace": "test-ns"}
checker = VirtChecker(
config,
iterations=5,
krkn_lib=self.mock_krkn_lib,
threads_limit=10
)
# 25 VMs / 10 threads = 3 VMs per batch (ceiling)
self.assertEqual(checker.batch_size, 3)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for ZoneOutageScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_zone_outage_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.zone_outage.zone_outage_scenario_plugin import ZoneOutageScenarioPlugin
class TestZoneOutageScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ZoneOutageScenarioPlugin
"""
self.plugin = ZoneOutageScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["zone_outages_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()