Files
krkn/tests/test_abstract_node_scenarios.py
Paige Patton ff3c4f5313 increasing node action coverage (#1010)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-12-22 11:36:10 -05:00

416 lines
16 KiB
Python

"""
Test suite for AbstractNode Scenarios
Usage:
python -m coverage run -a -m unittest tests/test_abstract_node_scenarios.py
Assisted By: Claude Code
"""
import unittest
from unittest.mock import Mock, patch
from krkn.scenario_plugins.node_actions.abstract_node_scenarios import abstract_node_scenarios
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus
class TestAbstractNodeScenarios(unittest.TestCase):
"""Test suite for abstract_node_scenarios class"""
def setUp(self):
"""Set up test fixtures before each test method"""
self.mock_kubecli = Mock(spec=KrknKubernetes)
self.mock_affected_nodes_status = Mock(spec=AffectedNodeStatus)
self.mock_affected_nodes_status.affected_nodes = []
self.node_action_kube_check = True
self.scenarios = abstract_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=self.node_action_kube_check,
affected_nodes_status=self.mock_affected_nodes_status
)
def test_init(self):
"""Test initialization of abstract_node_scenarios"""
self.assertEqual(self.scenarios.kubecli, self.mock_kubecli)
self.assertEqual(self.scenarios.affected_nodes_status, self.mock_affected_nodes_status)
self.assertTrue(self.scenarios.node_action_kube_check)
@patch('time.sleep')
@patch('logging.info')
def test_node_stop_start_scenario(self, mock_logging, mock_sleep):
"""Test node_stop_start_scenario calls stop and start in sequence"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
duration = 60
poll_interval = 10
self.scenarios.node_stop_scenario = Mock()
self.scenarios.node_start_scenario = Mock()
# Act
self.scenarios.node_stop_start_scenario(
instance_kill_count, node, timeout, duration, poll_interval
)
# Assert
self.scenarios.node_stop_scenario.assert_called_once_with(
instance_kill_count, node, timeout, poll_interval
)
mock_sleep.assert_called_once_with(duration)
self.scenarios.node_start_scenario.assert_called_once_with(
instance_kill_count, node, timeout, poll_interval
)
self.mock_affected_nodes_status.merge_affected_nodes.assert_called_once()
@patch('logging.info')
def test_helper_node_stop_start_scenario(self, mock_logging):
"""Test helper_node_stop_start_scenario calls helper stop and start"""
# Arrange
instance_kill_count = 1
node = "helper-node"
timeout = 300
self.scenarios.helper_node_stop_scenario = Mock()
self.scenarios.helper_node_start_scenario = Mock()
# Act
self.scenarios.helper_node_stop_start_scenario(instance_kill_count, node, timeout)
# Assert
self.scenarios.helper_node_stop_scenario.assert_called_once_with(
instance_kill_count, node, timeout
)
self.scenarios.helper_node_start_scenario.assert_called_once_with(
instance_kill_count, node, timeout
)
@patch('time.sleep')
@patch('logging.info')
def test_node_disk_detach_attach_scenario_success(self, mock_logging, mock_sleep):
"""Test disk detach/attach scenario with valid disk attachment"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
duration = 60
disk_details = {"disk_id": "disk-123", "device": "/dev/sdb"}
self.scenarios.get_disk_attachment_info = Mock(return_value=disk_details)
self.scenarios.disk_detach_scenario = Mock()
self.scenarios.disk_attach_scenario = Mock()
# Act
self.scenarios.node_disk_detach_attach_scenario(
instance_kill_count, node, timeout, duration
)
# Assert
self.scenarios.get_disk_attachment_info.assert_called_once_with(
instance_kill_count, node
)
self.scenarios.disk_detach_scenario.assert_called_once_with(
instance_kill_count, node, timeout
)
mock_sleep.assert_called_once_with(duration)
self.scenarios.disk_attach_scenario.assert_called_once_with(
instance_kill_count, disk_details, timeout
)
@patch('logging.error')
@patch('logging.info')
def test_node_disk_detach_attach_scenario_no_disk(self, mock_info, mock_error):
"""Test disk detach/attach scenario when only root disk exists"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
duration = 60
self.scenarios.get_disk_attachment_info = Mock(return_value=None)
self.scenarios.disk_detach_scenario = Mock()
self.scenarios.disk_attach_scenario = Mock()
# Act
self.scenarios.node_disk_detach_attach_scenario(
instance_kill_count, node, timeout, duration
)
# Assert
self.scenarios.disk_detach_scenario.assert_not_called()
self.scenarios.disk_attach_scenario.assert_not_called()
mock_error.assert_any_call("Node %s has only root disk attached" % node)
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_unknown_status')
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
@patch('logging.info')
def test_stop_kubelet_scenario_success(self, mock_logging, mock_run, mock_wait):
"""Test successful kubelet stop scenario"""
# Arrange
instance_kill_count = 2
node = "test-node"
timeout = 300
mock_affected_node = Mock(spec=AffectedNode)
mock_wait.return_value = None
# Act
with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode') as mock_affected_node_class:
mock_affected_node_class.return_value = mock_affected_node
self.scenarios.stop_kubelet_scenario(instance_kill_count, node, timeout)
# Assert
self.assertEqual(mock_run.call_count, 2)
expected_command = "oc debug node/" + node + " -- chroot /host systemctl stop kubelet"
mock_run.assert_called_with(expected_command)
self.assertEqual(mock_wait.call_count, 2)
self.assertEqual(len(self.mock_affected_nodes_status.affected_nodes), 2)
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_unknown_status')
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
@patch('logging.error')
@patch('logging.info')
def test_stop_kubelet_scenario_failure(self, mock_info, mock_error, mock_run, mock_wait):
"""Test kubelet stop scenario when command fails"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
error_msg = "Command failed"
mock_run.side_effect = Exception(error_msg)
# Act & Assert
with self.assertRaises(Exception):
with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode'):
self.scenarios.stop_kubelet_scenario(instance_kill_count, node, timeout)
mock_error.assert_any_call(
"Failed to stop the kubelet of the node. Encountered following "
"exception: %s. Test Failed" % error_msg
)
@patch('logging.info')
def test_stop_start_kubelet_scenario(self, mock_logging):
"""Test stop/start kubelet scenario"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
self.scenarios.stop_kubelet_scenario = Mock()
self.scenarios.node_reboot_scenario = Mock()
# Act
self.scenarios.stop_start_kubelet_scenario(instance_kill_count, node, timeout)
# Assert
self.scenarios.stop_kubelet_scenario.assert_called_once_with(
instance_kill_count, node, timeout
)
self.scenarios.node_reboot_scenario.assert_called_once_with(
instance_kill_count, node, timeout
)
self.mock_affected_nodes_status.merge_affected_nodes.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_ready_status')
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
@patch('logging.info')
def test_restart_kubelet_scenario_success(self, mock_logging, mock_run, mock_wait):
"""Test successful kubelet restart scenario"""
# Arrange
instance_kill_count = 2
node = "test-node"
timeout = 300
mock_affected_node = Mock(spec=AffectedNode)
mock_wait.return_value = None
# Act
with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode') as mock_affected_node_class:
mock_affected_node_class.return_value = mock_affected_node
self.scenarios.restart_kubelet_scenario(instance_kill_count, node, timeout)
# Assert
self.assertEqual(mock_run.call_count, 2)
expected_command = "oc debug node/" + node + " -- chroot /host systemctl restart kubelet &"
mock_run.assert_called_with(expected_command)
self.assertEqual(mock_wait.call_count, 2)
self.assertEqual(len(self.mock_affected_nodes_status.affected_nodes), 2)
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_ready_status')
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
@patch('logging.error')
@patch('logging.info')
def test_restart_kubelet_scenario_failure(self, mock_info, mock_error, mock_run, mock_wait):
"""Test kubelet restart scenario when command fails"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
error_msg = "Restart failed"
mock_run.side_effect = Exception(error_msg)
# Act & Assert
with self.assertRaises(Exception):
with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode'):
self.scenarios.restart_kubelet_scenario(instance_kill_count, node, timeout)
mock_error.assert_any_call(
"Failed to restart the kubelet of the node. Encountered following "
"exception: %s. Test Failed" % error_msg
)
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
@patch('logging.info')
def test_node_crash_scenario_success(self, mock_logging, mock_run):
"""Test successful node crash scenario"""
# Arrange
instance_kill_count = 2
node = "test-node"
timeout = 300
# Act
result = self.scenarios.node_crash_scenario(instance_kill_count, node, timeout)
# Assert
self.assertEqual(mock_run.call_count, 2)
expected_command = (
"oc debug node/" + node + " -- chroot /host "
"dd if=/dev/urandom of=/proc/sysrq-trigger"
)
mock_run.assert_called_with(expected_command)
self.assertIsNone(result)
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
@patch('logging.error')
@patch('logging.info')
def test_node_crash_scenario_failure(self, mock_info, mock_error, mock_run):
"""Test node crash scenario when command fails"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
error_msg = "Crash command failed"
mock_run.side_effect = Exception(error_msg)
# Act
result = self.scenarios.node_crash_scenario(instance_kill_count, node, timeout)
# Assert
self.assertEqual(result, 1)
mock_error.assert_any_call(
"Failed to crash the node. Encountered following exception: %s. "
"Test Failed" % error_msg
)
def test_node_start_scenario_not_implemented(self):
"""Test that node_start_scenario returns None (not implemented)"""
result = self.scenarios.node_start_scenario(1, "test-node", 300, 10)
self.assertIsNone(result)
def test_node_stop_scenario_not_implemented(self):
"""Test that node_stop_scenario returns None (not implemented)"""
result = self.scenarios.node_stop_scenario(1, "test-node", 300, 10)
self.assertIsNone(result)
def test_node_termination_scenario_not_implemented(self):
"""Test that node_termination_scenario returns None (not implemented)"""
result = self.scenarios.node_termination_scenario(1, "test-node", 300, 10)
self.assertIsNone(result)
def test_node_reboot_scenario_not_implemented(self):
"""Test that node_reboot_scenario returns None (not implemented)"""
result = self.scenarios.node_reboot_scenario(1, "test-node", 300)
self.assertIsNone(result)
def test_node_service_status_not_implemented(self):
"""Test that node_service_status returns None (not implemented)"""
result = self.scenarios.node_service_status("test-node", "service", "key", 300)
self.assertIsNone(result)
def test_node_block_scenario_not_implemented(self):
"""Test that node_block_scenario returns None (not implemented)"""
result = self.scenarios.node_block_scenario(1, "test-node", 300, 60)
self.assertIsNone(result)
class TestAbstractNodeScenariosIntegration(unittest.TestCase):
"""Integration tests for abstract_node_scenarios workflows"""
def setUp(self):
"""Set up test fixtures before each test method"""
self.mock_kubecli = Mock(spec=KrknKubernetes)
self.mock_affected_nodes_status = Mock(spec=AffectedNodeStatus)
self.mock_affected_nodes_status.affected_nodes = []
self.scenarios = abstract_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=True,
affected_nodes_status=self.mock_affected_nodes_status
)
@patch('time.sleep')
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_unknown_status')
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
def test_complete_stop_start_kubelet_workflow(self, mock_run, mock_wait, mock_sleep):
"""Test complete workflow of stop/start kubelet scenario"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
self.scenarios.node_reboot_scenario = Mock()
# Act
with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode'):
self.scenarios.stop_start_kubelet_scenario(instance_kill_count, node, timeout)
# Assert - verify stop kubelet was called
expected_stop_command = "oc debug node/" + node + " -- chroot /host systemctl stop kubelet"
mock_run.assert_any_call(expected_stop_command)
# Verify reboot was called
self.scenarios.node_reboot_scenario.assert_called_once_with(
instance_kill_count, node, timeout
)
# Verify merge was called
self.mock_affected_nodes_status.merge_affected_nodes.assert_called_once()
@patch('time.sleep')
def test_node_stop_start_scenario_workflow(self, mock_sleep):
"""Test complete workflow of node stop/start scenario"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
duration = 60
poll_interval = 10
self.scenarios.node_stop_scenario = Mock()
self.scenarios.node_start_scenario = Mock()
# Act
self.scenarios.node_stop_start_scenario(
instance_kill_count, node, timeout, duration, poll_interval
)
# Assert - verify order of operations
call_order = []
# Verify stop was called first
self.scenarios.node_stop_scenario.assert_called_once()
# Verify sleep was called
mock_sleep.assert_called_once_with(duration)
# Verify start was called after sleep
self.scenarios.node_start_scenario.assert_called_once()
# Verify merge was called
self.mock_affected_nodes_status.merge_affected_nodes.assert_called_once()
if __name__ == '__main__':
unittest.main()