increasing node action coverage (#1010)

Signed-off-by: Paige Patton <prubenda@redhat.com>
This commit is contained in:
Paige Patton
2025-12-22 11:36:10 -05:00
committed by GitHub
parent 4c74df301f
commit ff3c4f5313
3 changed files with 1600 additions and 1 deletions

View File

@@ -0,0 +1,415 @@
"""
Test suite for AbstractNode Scenarios
Usage:
python -m coverage run -a -m unittest tests/test_abstract_node_scenarios.py
Assisted By: Claude Code
"""
import unittest
from unittest.mock import Mock, patch
from krkn.scenario_plugins.node_actions.abstract_node_scenarios import abstract_node_scenarios
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus
class TestAbstractNodeScenarios(unittest.TestCase):
"""Test suite for abstract_node_scenarios class"""
def setUp(self):
"""Set up test fixtures before each test method"""
self.mock_kubecli = Mock(spec=KrknKubernetes)
self.mock_affected_nodes_status = Mock(spec=AffectedNodeStatus)
self.mock_affected_nodes_status.affected_nodes = []
self.node_action_kube_check = True
self.scenarios = abstract_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=self.node_action_kube_check,
affected_nodes_status=self.mock_affected_nodes_status
)
def test_init(self):
"""Test initialization of abstract_node_scenarios"""
self.assertEqual(self.scenarios.kubecli, self.mock_kubecli)
self.assertEqual(self.scenarios.affected_nodes_status, self.mock_affected_nodes_status)
self.assertTrue(self.scenarios.node_action_kube_check)
@patch('time.sleep')
@patch('logging.info')
def test_node_stop_start_scenario(self, mock_logging, mock_sleep):
"""Test node_stop_start_scenario calls stop and start in sequence"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
duration = 60
poll_interval = 10
self.scenarios.node_stop_scenario = Mock()
self.scenarios.node_start_scenario = Mock()
# Act
self.scenarios.node_stop_start_scenario(
instance_kill_count, node, timeout, duration, poll_interval
)
# Assert
self.scenarios.node_stop_scenario.assert_called_once_with(
instance_kill_count, node, timeout, poll_interval
)
mock_sleep.assert_called_once_with(duration)
self.scenarios.node_start_scenario.assert_called_once_with(
instance_kill_count, node, timeout, poll_interval
)
self.mock_affected_nodes_status.merge_affected_nodes.assert_called_once()
@patch('logging.info')
def test_helper_node_stop_start_scenario(self, mock_logging):
"""Test helper_node_stop_start_scenario calls helper stop and start"""
# Arrange
instance_kill_count = 1
node = "helper-node"
timeout = 300
self.scenarios.helper_node_stop_scenario = Mock()
self.scenarios.helper_node_start_scenario = Mock()
# Act
self.scenarios.helper_node_stop_start_scenario(instance_kill_count, node, timeout)
# Assert
self.scenarios.helper_node_stop_scenario.assert_called_once_with(
instance_kill_count, node, timeout
)
self.scenarios.helper_node_start_scenario.assert_called_once_with(
instance_kill_count, node, timeout
)
@patch('time.sleep')
@patch('logging.info')
def test_node_disk_detach_attach_scenario_success(self, mock_logging, mock_sleep):
"""Test disk detach/attach scenario with valid disk attachment"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
duration = 60
disk_details = {"disk_id": "disk-123", "device": "/dev/sdb"}
self.scenarios.get_disk_attachment_info = Mock(return_value=disk_details)
self.scenarios.disk_detach_scenario = Mock()
self.scenarios.disk_attach_scenario = Mock()
# Act
self.scenarios.node_disk_detach_attach_scenario(
instance_kill_count, node, timeout, duration
)
# Assert
self.scenarios.get_disk_attachment_info.assert_called_once_with(
instance_kill_count, node
)
self.scenarios.disk_detach_scenario.assert_called_once_with(
instance_kill_count, node, timeout
)
mock_sleep.assert_called_once_with(duration)
self.scenarios.disk_attach_scenario.assert_called_once_with(
instance_kill_count, disk_details, timeout
)
@patch('logging.error')
@patch('logging.info')
def test_node_disk_detach_attach_scenario_no_disk(self, mock_info, mock_error):
"""Test disk detach/attach scenario when only root disk exists"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
duration = 60
self.scenarios.get_disk_attachment_info = Mock(return_value=None)
self.scenarios.disk_detach_scenario = Mock()
self.scenarios.disk_attach_scenario = Mock()
# Act
self.scenarios.node_disk_detach_attach_scenario(
instance_kill_count, node, timeout, duration
)
# Assert
self.scenarios.disk_detach_scenario.assert_not_called()
self.scenarios.disk_attach_scenario.assert_not_called()
mock_error.assert_any_call("Node %s has only root disk attached" % node)
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_unknown_status')
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
@patch('logging.info')
def test_stop_kubelet_scenario_success(self, mock_logging, mock_run, mock_wait):
"""Test successful kubelet stop scenario"""
# Arrange
instance_kill_count = 2
node = "test-node"
timeout = 300
mock_affected_node = Mock(spec=AffectedNode)
mock_wait.return_value = None
# Act
with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode') as mock_affected_node_class:
mock_affected_node_class.return_value = mock_affected_node
self.scenarios.stop_kubelet_scenario(instance_kill_count, node, timeout)
# Assert
self.assertEqual(mock_run.call_count, 2)
expected_command = "oc debug node/" + node + " -- chroot /host systemctl stop kubelet"
mock_run.assert_called_with(expected_command)
self.assertEqual(mock_wait.call_count, 2)
self.assertEqual(len(self.mock_affected_nodes_status.affected_nodes), 2)
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_unknown_status')
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
@patch('logging.error')
@patch('logging.info')
def test_stop_kubelet_scenario_failure(self, mock_info, mock_error, mock_run, mock_wait):
"""Test kubelet stop scenario when command fails"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
error_msg = "Command failed"
mock_run.side_effect = Exception(error_msg)
# Act & Assert
with self.assertRaises(Exception):
with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode'):
self.scenarios.stop_kubelet_scenario(instance_kill_count, node, timeout)
mock_error.assert_any_call(
"Failed to stop the kubelet of the node. Encountered following "
"exception: %s. Test Failed" % error_msg
)
@patch('logging.info')
def test_stop_start_kubelet_scenario(self, mock_logging):
"""Test stop/start kubelet scenario"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
self.scenarios.stop_kubelet_scenario = Mock()
self.scenarios.node_reboot_scenario = Mock()
# Act
self.scenarios.stop_start_kubelet_scenario(instance_kill_count, node, timeout)
# Assert
self.scenarios.stop_kubelet_scenario.assert_called_once_with(
instance_kill_count, node, timeout
)
self.scenarios.node_reboot_scenario.assert_called_once_with(
instance_kill_count, node, timeout
)
self.mock_affected_nodes_status.merge_affected_nodes.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_ready_status')
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
@patch('logging.info')
def test_restart_kubelet_scenario_success(self, mock_logging, mock_run, mock_wait):
"""Test successful kubelet restart scenario"""
# Arrange
instance_kill_count = 2
node = "test-node"
timeout = 300
mock_affected_node = Mock(spec=AffectedNode)
mock_wait.return_value = None
# Act
with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode') as mock_affected_node_class:
mock_affected_node_class.return_value = mock_affected_node
self.scenarios.restart_kubelet_scenario(instance_kill_count, node, timeout)
# Assert
self.assertEqual(mock_run.call_count, 2)
expected_command = "oc debug node/" + node + " -- chroot /host systemctl restart kubelet &"
mock_run.assert_called_with(expected_command)
self.assertEqual(mock_wait.call_count, 2)
self.assertEqual(len(self.mock_affected_nodes_status.affected_nodes), 2)
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_ready_status')
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
@patch('logging.error')
@patch('logging.info')
def test_restart_kubelet_scenario_failure(self, mock_info, mock_error, mock_run, mock_wait):
"""Test kubelet restart scenario when command fails"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
error_msg = "Restart failed"
mock_run.side_effect = Exception(error_msg)
# Act & Assert
with self.assertRaises(Exception):
with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode'):
self.scenarios.restart_kubelet_scenario(instance_kill_count, node, timeout)
mock_error.assert_any_call(
"Failed to restart the kubelet of the node. Encountered following "
"exception: %s. Test Failed" % error_msg
)
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
@patch('logging.info')
def test_node_crash_scenario_success(self, mock_logging, mock_run):
"""Test successful node crash scenario"""
# Arrange
instance_kill_count = 2
node = "test-node"
timeout = 300
# Act
result = self.scenarios.node_crash_scenario(instance_kill_count, node, timeout)
# Assert
self.assertEqual(mock_run.call_count, 2)
expected_command = (
"oc debug node/" + node + " -- chroot /host "
"dd if=/dev/urandom of=/proc/sysrq-trigger"
)
mock_run.assert_called_with(expected_command)
self.assertIsNone(result)
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
@patch('logging.error')
@patch('logging.info')
def test_node_crash_scenario_failure(self, mock_info, mock_error, mock_run):
"""Test node crash scenario when command fails"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
error_msg = "Crash command failed"
mock_run.side_effect = Exception(error_msg)
# Act
result = self.scenarios.node_crash_scenario(instance_kill_count, node, timeout)
# Assert
self.assertEqual(result, 1)
mock_error.assert_any_call(
"Failed to crash the node. Encountered following exception: %s. "
"Test Failed" % error_msg
)
def test_node_start_scenario_not_implemented(self):
"""Test that node_start_scenario returns None (not implemented)"""
result = self.scenarios.node_start_scenario(1, "test-node", 300, 10)
self.assertIsNone(result)
def test_node_stop_scenario_not_implemented(self):
"""Test that node_stop_scenario returns None (not implemented)"""
result = self.scenarios.node_stop_scenario(1, "test-node", 300, 10)
self.assertIsNone(result)
def test_node_termination_scenario_not_implemented(self):
"""Test that node_termination_scenario returns None (not implemented)"""
result = self.scenarios.node_termination_scenario(1, "test-node", 300, 10)
self.assertIsNone(result)
def test_node_reboot_scenario_not_implemented(self):
"""Test that node_reboot_scenario returns None (not implemented)"""
result = self.scenarios.node_reboot_scenario(1, "test-node", 300)
self.assertIsNone(result)
def test_node_service_status_not_implemented(self):
"""Test that node_service_status returns None (not implemented)"""
result = self.scenarios.node_service_status("test-node", "service", "key", 300)
self.assertIsNone(result)
def test_node_block_scenario_not_implemented(self):
"""Test that node_block_scenario returns None (not implemented)"""
result = self.scenarios.node_block_scenario(1, "test-node", 300, 60)
self.assertIsNone(result)
class TestAbstractNodeScenariosIntegration(unittest.TestCase):
"""Integration tests for abstract_node_scenarios workflows"""
def setUp(self):
"""Set up test fixtures before each test method"""
self.mock_kubecli = Mock(spec=KrknKubernetes)
self.mock_affected_nodes_status = Mock(spec=AffectedNodeStatus)
self.mock_affected_nodes_status.affected_nodes = []
self.scenarios = abstract_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=True,
affected_nodes_status=self.mock_affected_nodes_status
)
@patch('time.sleep')
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_unknown_status')
@patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run')
def test_complete_stop_start_kubelet_workflow(self, mock_run, mock_wait, mock_sleep):
"""Test complete workflow of stop/start kubelet scenario"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
self.scenarios.node_reboot_scenario = Mock()
# Act
with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode'):
self.scenarios.stop_start_kubelet_scenario(instance_kill_count, node, timeout)
# Assert - verify stop kubelet was called
expected_stop_command = "oc debug node/" + node + " -- chroot /host systemctl stop kubelet"
mock_run.assert_any_call(expected_stop_command)
# Verify reboot was called
self.scenarios.node_reboot_scenario.assert_called_once_with(
instance_kill_count, node, timeout
)
# Verify merge was called
self.mock_affected_nodes_status.merge_affected_nodes.assert_called_once()
@patch('time.sleep')
def test_node_stop_start_scenario_workflow(self, mock_sleep):
"""Test complete workflow of node stop/start scenario"""
# Arrange
instance_kill_count = 1
node = "test-node"
timeout = 300
duration = 60
poll_interval = 10
self.scenarios.node_stop_scenario = Mock()
self.scenarios.node_start_scenario = Mock()
# Act
self.scenarios.node_stop_start_scenario(
instance_kill_count, node, timeout, duration, poll_interval
)
# Assert - verify order of operations
call_order = []
# Verify stop was called first
self.scenarios.node_stop_scenario.assert_called_once()
# Verify sleep was called
mock_sleep.assert_called_once_with(duration)
# Verify start was called after sleep
self.scenarios.node_start_scenario.assert_called_once()
# Verify merge was called
self.mock_affected_nodes_status.merge_affected_nodes.assert_called_once()
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,476 @@
#!/usr/bin/env python3
"""
Test suite for common_node_functions module
Usage:
python -m coverage run -a -m unittest tests/test_common_node_functions.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock, Mock, patch, call
import logging
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.k8s import AffectedNode
from krkn.scenario_plugins.node_actions import common_node_functions
class TestCommonNodeFunctions(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures before each test
"""
self.mock_kubecli = Mock(spec=KrknKubernetes)
self.mock_affected_node = Mock(spec=AffectedNode)
def test_get_node_by_name_all_nodes_exist(self):
"""
Test get_node_by_name returns list when all nodes exist
"""
node_name_list = ["node1", "node2", "node3"]
self.mock_kubecli.list_killable_nodes.return_value = ["node1", "node2", "node3", "node4"]
result = common_node_functions.get_node_by_name(node_name_list, self.mock_kubecli)
self.assertEqual(result, node_name_list)
self.mock_kubecli.list_killable_nodes.assert_called_once()
def test_get_node_by_name_single_node(self):
"""
Test get_node_by_name with single node
"""
node_name_list = ["worker-1"]
self.mock_kubecli.list_killable_nodes.return_value = ["worker-1", "worker-2"]
result = common_node_functions.get_node_by_name(node_name_list, self.mock_kubecli)
self.assertEqual(result, node_name_list)
@patch('logging.info')
def test_get_node_by_name_node_not_exist(self, mock_logging):
"""
Test get_node_by_name returns None when node doesn't exist
"""
node_name_list = ["node1", "nonexistent-node"]
self.mock_kubecli.list_killable_nodes.return_value = ["node1", "node2", "node3"]
result = common_node_functions.get_node_by_name(node_name_list, self.mock_kubecli)
self.assertIsNone(result)
mock_logging.assert_called()
self.assertIn("does not exist", str(mock_logging.call_args))
@patch('logging.info')
def test_get_node_by_name_empty_killable_list(self, mock_logging):
"""
Test get_node_by_name when no killable nodes exist
"""
node_name_list = ["node1"]
self.mock_kubecli.list_killable_nodes.return_value = []
result = common_node_functions.get_node_by_name(node_name_list, self.mock_kubecli)
self.assertIsNone(result)
mock_logging.assert_called()
@patch('logging.info')
def test_get_node_single_label_selector(self, mock_logging):
"""
Test get_node with single label selector
"""
label_selector = "node-role.kubernetes.io/worker"
instance_kill_count = 2
self.mock_kubecli.list_killable_nodes.return_value = ["worker-1", "worker-2", "worker-3"]
result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli)
self.assertEqual(len(result), 2)
self.assertTrue(all(node in ["worker-1", "worker-2", "worker-3"] for node in result))
self.mock_kubecli.list_killable_nodes.assert_called_once_with(label_selector)
mock_logging.assert_called()
@patch('logging.info')
def test_get_node_multiple_label_selectors(self, mock_logging):
"""
Test get_node with multiple comma-separated label selectors
"""
label_selector = "node-role.kubernetes.io/worker,topology.kubernetes.io/zone=us-east-1a"
instance_kill_count = 3
self.mock_kubecli.list_killable_nodes.side_effect = [
["worker-1", "worker-2"],
["worker-3", "worker-4"]
]
result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli)
self.assertEqual(len(result), 3)
self.assertTrue(all(node in ["worker-1", "worker-2", "worker-3", "worker-4"] for node in result))
self.assertEqual(self.mock_kubecli.list_killable_nodes.call_count, 2)
@patch('logging.info')
def test_get_node_return_all_when_count_equals_total(self, mock_logging):
"""
Test get_node returns all nodes when instance_kill_count equals number of nodes
"""
label_selector = "node-role.kubernetes.io/worker"
nodes = ["worker-1", "worker-2", "worker-3"]
instance_kill_count = 3
self.mock_kubecli.list_killable_nodes.return_value = nodes
result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli)
self.assertEqual(result, nodes)
@patch('logging.info')
def test_get_node_return_all_when_count_is_zero(self, mock_logging):
"""
Test get_node returns all nodes when instance_kill_count is 0
"""
label_selector = "node-role.kubernetes.io/worker"
nodes = ["worker-1", "worker-2", "worker-3"]
instance_kill_count = 0
self.mock_kubecli.list_killable_nodes.return_value = nodes
result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli)
self.assertEqual(result, nodes)
@patch('logging.info')
@patch('random.randint')
def test_get_node_random_selection(self, mock_randint, mock_logging):
"""
Test get_node randomly selects nodes when count is less than total
"""
label_selector = "node-role.kubernetes.io/worker"
instance_kill_count = 2
self.mock_kubecli.list_killable_nodes.return_value = ["worker-1", "worker-2", "worker-3", "worker-4"]
# Mock random selection to return predictable values
mock_randint.side_effect = [1, 0] # Select index 1, then index 0
result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli)
self.assertEqual(len(result), 2)
# Verify nodes were removed after selection to avoid duplicates
self.assertEqual(len(set(result)), 2)
def test_get_node_no_nodes_with_label(self):
"""
Test get_node raises exception when no nodes match label selector
"""
label_selector = "nonexistent-label"
instance_kill_count = 1
self.mock_kubecli.list_killable_nodes.return_value = []
with self.assertRaises(Exception) as context:
common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli)
self.assertIn("Ready nodes with the provided label selector do not exist", str(context.exception))
def test_get_node_single_node_available(self):
"""
Test get_node when only one node is available
"""
label_selector = "node-role.kubernetes.io/master"
instance_kill_count = 1
self.mock_kubecli.list_killable_nodes.return_value = ["master-1"]
result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli)
self.assertEqual(result, ["master-1"])
def test_wait_for_ready_status_without_affected_node(self):
"""
Test wait_for_ready_status without providing affected_node
"""
node = "test-node"
timeout = 300
expected_affected_node = Mock(spec=AffectedNode)
self.mock_kubecli.watch_node_status.return_value = expected_affected_node
result = common_node_functions.wait_for_ready_status(node, timeout, self.mock_kubecli)
self.assertEqual(result, expected_affected_node)
self.mock_kubecli.watch_node_status.assert_called_once_with(node, "True", timeout, None)
def test_wait_for_ready_status_with_affected_node(self):
"""
Test wait_for_ready_status with provided affected_node
"""
node = "test-node"
timeout = 300
self.mock_kubecli.watch_node_status.return_value = self.mock_affected_node
result = common_node_functions.wait_for_ready_status(
node, timeout, self.mock_kubecli, self.mock_affected_node
)
self.assertEqual(result, self.mock_affected_node)
self.mock_kubecli.watch_node_status.assert_called_once_with(
node, "True", timeout, self.mock_affected_node
)
def test_wait_for_not_ready_status_without_affected_node(self):
"""
Test wait_for_not_ready_status without providing affected_node
"""
node = "test-node"
timeout = 300
expected_affected_node = Mock(spec=AffectedNode)
self.mock_kubecli.watch_node_status.return_value = expected_affected_node
result = common_node_functions.wait_for_not_ready_status(node, timeout, self.mock_kubecli)
self.assertEqual(result, expected_affected_node)
self.mock_kubecli.watch_node_status.assert_called_once_with(node, "False", timeout, None)
def test_wait_for_not_ready_status_with_affected_node(self):
"""
Test wait_for_not_ready_status with provided affected_node
"""
node = "test-node"
timeout = 300
self.mock_kubecli.watch_node_status.return_value = self.mock_affected_node
result = common_node_functions.wait_for_not_ready_status(
node, timeout, self.mock_kubecli, self.mock_affected_node
)
self.assertEqual(result, self.mock_affected_node)
self.mock_kubecli.watch_node_status.assert_called_once_with(
node, "False", timeout, self.mock_affected_node
)
def test_wait_for_unknown_status_without_affected_node(self):
"""
Test wait_for_unknown_status without providing affected_node
"""
node = "test-node"
timeout = 300
expected_affected_node = Mock(spec=AffectedNode)
self.mock_kubecli.watch_node_status.return_value = expected_affected_node
result = common_node_functions.wait_for_unknown_status(node, timeout, self.mock_kubecli)
self.assertEqual(result, expected_affected_node)
self.mock_kubecli.watch_node_status.assert_called_once_with(node, "Unknown", timeout, None)
def test_wait_for_unknown_status_with_affected_node(self):
"""
Test wait_for_unknown_status with provided affected_node
"""
node = "test-node"
timeout = 300
self.mock_kubecli.watch_node_status.return_value = self.mock_affected_node
result = common_node_functions.wait_for_unknown_status(
node, timeout, self.mock_kubecli, self.mock_affected_node
)
self.assertEqual(result, self.mock_affected_node)
self.mock_kubecli.watch_node_status.assert_called_once_with(
node, "Unknown", timeout, self.mock_affected_node
)
@patch('time.sleep')
@patch('logging.info')
@patch('krkn.scenario_plugins.node_actions.common_node_functions.paramiko.SSHClient')
def test_check_service_status_success(self, mock_ssh_client, mock_logging, mock_sleep):
"""
Test check_service_status successfully checks service status
"""
node = "192.168.1.100"
service = ["neutron-server", "nova-compute"]
ssh_private_key = "~/.ssh/id_rsa"
timeout = 60
# Mock SSH client
mock_ssh = Mock()
mock_ssh_client.return_value = mock_ssh
mock_ssh.connect.return_value = None
# Mock exec_command to return active status
mock_stdout = Mock()
mock_stdout.readlines.return_value = ["active\n"]
mock_ssh.exec_command.return_value = (Mock(), mock_stdout, Mock())
common_node_functions.check_service_status(node, service, ssh_private_key, timeout)
# Verify SSH connection was attempted
mock_ssh.connect.assert_called()
# Verify service status was checked for each service
self.assertEqual(mock_ssh.exec_command.call_count, 2)
# Verify SSH connection was closed
mock_ssh.close.assert_called_once()
@patch('time.sleep')
@patch('logging.error')
@patch('logging.info')
@patch('krkn.scenario_plugins.node_actions.common_node_functions.paramiko.SSHClient')
def test_check_service_status_service_inactive(self, mock_ssh_client, mock_logging_info, mock_logging_error, mock_sleep):
"""
Test check_service_status logs error when service is inactive
"""
node = "192.168.1.100"
service = ["neutron-server"]
ssh_private_key = "~/.ssh/id_rsa"
timeout = 60
# Mock SSH client
mock_ssh = Mock()
mock_ssh_client.return_value = mock_ssh
mock_ssh.connect.return_value = None
# Mock exec_command to return inactive status
mock_stdout = Mock()
mock_stdout.readlines.return_value = ["inactive\n"]
mock_ssh.exec_command.return_value = (Mock(), mock_stdout, Mock())
common_node_functions.check_service_status(node, service, ssh_private_key, timeout)
# Verify error was logged for inactive service
mock_logging_error.assert_called()
error_call_str = str(mock_logging_error.call_args)
self.assertIn("inactive", error_call_str)
mock_ssh.close.assert_called_once()
@patch('time.sleep')
@patch('logging.error')
@patch('logging.info')
@patch('krkn.scenario_plugins.node_actions.common_node_functions.paramiko.SSHClient')
def test_check_service_status_ssh_connection_fails(self, mock_ssh_client, mock_logging_info, mock_logging_error, mock_sleep):
"""
Test check_service_status handles SSH connection failures
"""
node = "192.168.1.100"
service = ["neutron-server"]
ssh_private_key = "~/.ssh/id_rsa"
timeout = 5
# Mock SSH client to raise exception
mock_ssh = Mock()
mock_ssh_client.return_value = mock_ssh
mock_ssh.connect.side_effect = Exception("Connection timeout")
# Mock exec_command for when connection eventually works (or doesn't)
mock_stdout = Mock()
mock_stdout.readlines.return_value = ["active\n"]
mock_ssh.exec_command.return_value = (Mock(), mock_stdout, Mock())
common_node_functions.check_service_status(node, service, ssh_private_key, timeout)
# Verify error was logged for SSH connection failure
mock_logging_error.assert_called()
error_call_str = str(mock_logging_error.call_args)
self.assertIn("Failed to ssh", error_call_str)
@patch('time.sleep')
@patch('logging.info')
@patch('krkn.scenario_plugins.node_actions.common_node_functions.paramiko.SSHClient')
def test_check_service_status_multiple_services(self, mock_ssh_client, mock_logging, mock_sleep):
"""
Test check_service_status with multiple services
"""
node = "192.168.1.100"
service = ["service1", "service2", "service3"]
ssh_private_key = "~/.ssh/id_rsa"
timeout = 60
# Mock SSH client
mock_ssh = Mock()
mock_ssh_client.return_value = mock_ssh
mock_ssh.connect.return_value = None
# Mock exec_command to return active status
mock_stdout = Mock()
mock_stdout.readlines.return_value = ["active\n"]
mock_ssh.exec_command.return_value = (Mock(), mock_stdout, Mock())
common_node_functions.check_service_status(node, service, ssh_private_key, timeout)
# Verify service status was checked for all services
self.assertEqual(mock_ssh.exec_command.call_count, 3)
mock_ssh.close.assert_called_once()
@patch('time.sleep')
@patch('logging.info')
@patch('krkn.scenario_plugins.node_actions.common_node_functions.paramiko.SSHClient')
def test_check_service_status_retry_logic(self, mock_ssh_client, mock_logging, mock_sleep):
"""
Test check_service_status retry logic on connection failure then success
"""
node = "192.168.1.100"
service = ["neutron-server"]
ssh_private_key = "~/.ssh/id_rsa"
timeout = 10
# Mock SSH client
mock_ssh = Mock()
mock_ssh_client.return_value = mock_ssh
# First two attempts fail, third succeeds
mock_ssh.connect.side_effect = [
Exception("Timeout"),
Exception("Timeout"),
None # Success
]
# Mock exec_command
mock_stdout = Mock()
mock_stdout.readlines.return_value = ["active\n"]
mock_ssh.exec_command.return_value = (Mock(), mock_stdout, Mock())
common_node_functions.check_service_status(node, service, ssh_private_key, timeout)
# Verify multiple connection attempts were made
self.assertGreater(mock_ssh.connect.call_count, 1)
# Verify service was eventually checked
mock_ssh.exec_command.assert_called()
mock_ssh.close.assert_called_once()
class TestCommonNodeFunctionsIntegration(unittest.TestCase):
"""Integration-style tests for common_node_functions"""
def setUp(self):
"""Set up test fixtures"""
self.mock_kubecli = Mock(spec=KrknKubernetes)
@patch('logging.info')
def test_get_node_workflow_with_label_filtering(self, mock_logging):
"""
Test complete workflow of getting nodes with label selector and filtering
"""
label_selector = "node-role.kubernetes.io/worker"
instance_kill_count = 2
available_nodes = ["worker-1", "worker-2", "worker-3", "worker-4", "worker-5"]
self.mock_kubecli.list_killable_nodes.return_value = available_nodes
result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli)
self.assertEqual(len(result), 2)
# Verify no duplicates
self.assertEqual(len(result), len(set(result)))
# Verify all nodes are from the available list
self.assertTrue(all(node in available_nodes for node in result))
@patch('logging.info')
def test_get_node_by_name_validation_workflow(self, mock_logging):
"""
Test complete workflow of validating node names
"""
requested_nodes = ["node-a", "node-b"]
killable_nodes = ["node-a", "node-b", "node-c", "node-d"]
self.mock_kubecli.list_killable_nodes.return_value = killable_nodes
result = common_node_functions.get_node_by_name(requested_nodes, self.mock_kubecli)
self.assertEqual(result, requested_nodes)
self.mock_kubecli.list_killable_nodes.assert_called_once()
if __name__ == "__main__":
unittest.main()

View File

@@ -10,10 +10,15 @@ Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from unittest.mock import MagicMock, Mock, patch, mock_open, call
import yaml
import tempfile
import os
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.models.k8s import AffectedNodeStatus
from krkn.scenario_plugins.node_actions.node_actions_scenario_plugin import NodeActionsScenarioPlugin
@@ -24,7 +29,16 @@ class TestNodeActionsScenarioPlugin(unittest.TestCase):
"""
Set up test fixtures for NodeActionsScenarioPlugin
"""
# Reset node_general global variable before each test
import krkn.scenario_plugins.node_actions.node_actions_scenario_plugin as plugin_module
plugin_module.node_general = False
self.plugin = NodeActionsScenarioPlugin()
self.mock_kubecli = Mock(spec=KrknKubernetes)
self.mock_lib_telemetry = Mock(spec=KrknTelemetryOpenshift)
self.mock_lib_telemetry.get_lib_kubernetes.return_value = self.mock_kubecli
self.mock_scenario_telemetry = Mock(spec=ScenarioTelemetry)
self.mock_scenario_telemetry.affected_nodes = []
def test_get_scenario_types(self):
"""
@@ -35,6 +49,700 @@ class TestNodeActionsScenarioPlugin(unittest.TestCase):
self.assertEqual(result, ["node_scenarios"])
self.assertEqual(len(result), 1)
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.general_node_scenarios')
def test_get_node_scenario_object_generic(self, mock_general_scenarios):
"""
Test get_node_scenario_object returns general_node_scenarios for generic cloud type
"""
node_scenario = {"cloud_type": "generic"}
mock_general_instance = Mock()
mock_general_scenarios.return_value = mock_general_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_general_instance)
mock_general_scenarios.assert_called_once()
args = mock_general_scenarios.call_args[0]
self.assertEqual(args[0], self.mock_kubecli)
self.assertTrue(args[1]) # node_action_kube_check defaults to True
self.assertIsInstance(args[2], AffectedNodeStatus)
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.general_node_scenarios')
def test_get_node_scenario_object_no_cloud_type(self, mock_general_scenarios):
"""
Test get_node_scenario_object returns general_node_scenarios when cloud_type is not specified
"""
node_scenario = {}
mock_general_instance = Mock()
mock_general_scenarios.return_value = mock_general_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_general_instance)
mock_general_scenarios.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.aws_node_scenarios')
def test_get_node_scenario_object_aws(self, mock_aws_scenarios):
"""
Test get_node_scenario_object returns aws_node_scenarios for AWS cloud type
"""
node_scenario = {"cloud_type": "aws"}
mock_aws_instance = Mock()
mock_aws_scenarios.return_value = mock_aws_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_aws_instance)
mock_aws_scenarios.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.gcp_node_scenarios')
def test_get_node_scenario_object_gcp(self, mock_gcp_scenarios):
"""
Test get_node_scenario_object returns gcp_node_scenarios for GCP cloud type
"""
node_scenario = {"cloud_type": "gcp"}
mock_gcp_instance = Mock()
mock_gcp_scenarios.return_value = mock_gcp_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_gcp_instance)
mock_gcp_scenarios.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.azure_node_scenarios')
def test_get_node_scenario_object_azure(self, mock_azure_scenarios):
"""
Test get_node_scenario_object returns azure_node_scenarios for Azure cloud type
"""
node_scenario = {"cloud_type": "azure"}
mock_azure_instance = Mock()
mock_azure_scenarios.return_value = mock_azure_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_azure_instance)
mock_azure_scenarios.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.azure_node_scenarios')
def test_get_node_scenario_object_az(self, mock_azure_scenarios):
"""
Test get_node_scenario_object returns azure_node_scenarios for 'az' cloud type alias
"""
node_scenario = {"cloud_type": "az"}
mock_azure_instance = Mock()
mock_azure_scenarios.return_value = mock_azure_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_azure_instance)
mock_azure_scenarios.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.docker_node_scenarios')
def test_get_node_scenario_object_docker(self, mock_docker_scenarios):
"""
Test get_node_scenario_object returns docker_node_scenarios for Docker cloud type
"""
node_scenario = {"cloud_type": "docker"}
mock_docker_instance = Mock()
mock_docker_scenarios.return_value = mock_docker_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_docker_instance)
mock_docker_scenarios.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.vmware_node_scenarios')
def test_get_node_scenario_object_vmware(self, mock_vmware_scenarios):
"""
Test get_node_scenario_object returns vmware_node_scenarios for VMware cloud type
"""
node_scenario = {"cloud_type": "vmware"}
mock_vmware_instance = Mock()
mock_vmware_scenarios.return_value = mock_vmware_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_vmware_instance)
mock_vmware_scenarios.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.vmware_node_scenarios')
def test_get_node_scenario_object_vsphere(self, mock_vmware_scenarios):
"""
Test get_node_scenario_object returns vmware_node_scenarios for vSphere cloud type alias
"""
node_scenario = {"cloud_type": "vsphere"}
mock_vmware_instance = Mock()
mock_vmware_scenarios.return_value = mock_vmware_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_vmware_instance)
mock_vmware_scenarios.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.ibm_node_scenarios')
def test_get_node_scenario_object_ibm(self, mock_ibm_scenarios):
"""
Test get_node_scenario_object returns ibm_node_scenarios for IBM cloud type
"""
node_scenario = {"cloud_type": "ibm"}
mock_ibm_instance = Mock()
mock_ibm_scenarios.return_value = mock_ibm_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_ibm_instance)
mock_ibm_scenarios.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.ibm_node_scenarios')
def test_get_node_scenario_object_ibmcloud(self, mock_ibm_scenarios):
"""
Test get_node_scenario_object returns ibm_node_scenarios for ibmcloud cloud type alias
"""
node_scenario = {"cloud_type": "ibmcloud", "disable_ssl_verification": False}
mock_ibm_instance = Mock()
mock_ibm_scenarios.return_value = mock_ibm_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_ibm_instance)
args = mock_ibm_scenarios.call_args[0]
self.assertFalse(args[3]) # disable_ssl_verification should be False
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.ibmcloud_power_node_scenarios')
def test_get_node_scenario_object_ibmpower(self, mock_ibmpower_scenarios):
"""
Test get_node_scenario_object returns ibmcloud_power_node_scenarios for ibmpower cloud type
"""
node_scenario = {"cloud_type": "ibmpower"}
mock_ibmpower_instance = Mock()
mock_ibmpower_scenarios.return_value = mock_ibmpower_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_ibmpower_instance)
mock_ibmpower_scenarios.assert_called_once()
def test_get_node_scenario_object_openstack(self):
"""
Test get_node_scenario_object returns openstack_node_scenarios for OpenStack cloud type
"""
with patch('krkn.scenario_plugins.node_actions.openstack_node_scenarios.openstack_node_scenarios') as mock_openstack:
node_scenario = {"cloud_type": "openstack"}
mock_openstack_instance = Mock()
mock_openstack.return_value = mock_openstack_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_openstack_instance)
mock_openstack.assert_called_once()
def test_get_node_scenario_object_alibaba(self):
"""
Test get_node_scenario_object returns alibaba_node_scenarios for Alibaba cloud type
"""
with patch('krkn.scenario_plugins.node_actions.alibaba_node_scenarios.alibaba_node_scenarios') as mock_alibaba:
node_scenario = {"cloud_type": "alibaba"}
mock_alibaba_instance = Mock()
mock_alibaba.return_value = mock_alibaba_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_alibaba_instance)
mock_alibaba.assert_called_once()
def test_get_node_scenario_object_alicloud(self):
"""
Test get_node_scenario_object returns alibaba_node_scenarios for alicloud alias
"""
with patch('krkn.scenario_plugins.node_actions.alibaba_node_scenarios.alibaba_node_scenarios') as mock_alibaba:
node_scenario = {"cloud_type": "alicloud"}
mock_alibaba_instance = Mock()
mock_alibaba.return_value = mock_alibaba_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_alibaba_instance)
mock_alibaba.assert_called_once()
def test_get_node_scenario_object_bm(self):
"""
Test get_node_scenario_object returns bm_node_scenarios for bare metal cloud type
"""
with patch('krkn.scenario_plugins.node_actions.bm_node_scenarios.bm_node_scenarios') as mock_bm:
node_scenario = {
"cloud_type": "bm",
"bmc_info": "192.168.1.1",
"bmc_user": "admin",
"bmc_password": "password"
}
mock_bm_instance = Mock()
mock_bm.return_value = mock_bm_instance
result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertEqual(result, mock_bm_instance)
args = mock_bm.call_args[0]
self.assertEqual(args[0], "192.168.1.1")
self.assertEqual(args[1], "admin")
self.assertEqual(args[2], "password")
def test_get_node_scenario_object_unsupported_cloud(self):
"""
Test get_node_scenario_object raises exception for unsupported cloud type
"""
node_scenario = {"cloud_type": "unsupported_cloud"}
with self.assertRaises(Exception) as context:
self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli)
self.assertIn("not currently supported", str(context.exception))
self.assertIn("unsupported_cloud", str(context.exception))
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.common_node_functions')
def test_inject_node_scenario_with_node_name(self, mock_common_funcs):
"""
Test inject_node_scenario with specific node name
"""
node_scenario = {
"node_name": "node1,node2",
"instance_count": 2,
"runs": 1,
"timeout": 120,
"duration": 60,
"poll_interval": 15
}
action = "node_stop_start_scenario"
mock_scenario_object = Mock()
mock_scenario_object.affected_nodes_status = AffectedNodeStatus()
mock_scenario_object.affected_nodes_status.affected_nodes = []
mock_common_funcs.get_node_by_name.return_value = ["node1", "node2"]
self.plugin.inject_node_scenario(
action,
node_scenario,
mock_scenario_object,
self.mock_kubecli,
self.mock_scenario_telemetry
)
mock_common_funcs.get_node_by_name.assert_called_once_with(["node1", "node2"], self.mock_kubecli)
self.assertEqual(mock_scenario_object.node_stop_start_scenario.call_count, 2)
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.common_node_functions')
def test_inject_node_scenario_with_label_selector(self, mock_common_funcs):
"""
Test inject_node_scenario with label selector
"""
node_scenario = {
"label_selector": "node-role.kubernetes.io/worker",
"instance_count": 1
}
action = "node_reboot_scenario"
mock_scenario_object = Mock()
mock_scenario_object.affected_nodes_status = AffectedNodeStatus()
mock_scenario_object.affected_nodes_status.affected_nodes = []
mock_common_funcs.get_node.return_value = ["worker-node-1"]
self.plugin.inject_node_scenario(
action,
node_scenario,
mock_scenario_object,
self.mock_kubecli,
self.mock_scenario_telemetry
)
mock_common_funcs.get_node.assert_called_once_with("node-role.kubernetes.io/worker", 1, self.mock_kubecli)
mock_scenario_object.node_reboot_scenario.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.common_node_functions')
def test_inject_node_scenario_with_exclude_label(self, mock_common_funcs):
"""
Test inject_node_scenario with exclude label
"""
node_scenario = {
"label_selector": "node-role.kubernetes.io/worker",
"exclude_label": "node-role.kubernetes.io/master",
"instance_count": 2
}
action = "node_stop_scenario"
mock_scenario_object = Mock()
mock_scenario_object.affected_nodes_status = AffectedNodeStatus()
mock_scenario_object.affected_nodes_status.affected_nodes = []
mock_common_funcs.get_node.side_effect = [
["worker-1", "master-1"],
["master-1"]
]
self.plugin.inject_node_scenario(
action,
node_scenario,
mock_scenario_object,
self.mock_kubecli,
self.mock_scenario_telemetry
)
self.assertEqual(mock_common_funcs.get_node.call_count, 2)
# Should only process worker-1 after excluding master-1
self.assertEqual(mock_scenario_object.node_stop_scenario.call_count, 1)
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.common_node_functions')
def test_inject_node_scenario_parallel_mode(self, mock_common_funcs):
"""
Test inject_node_scenario with parallel processing
"""
node_scenario = {
"node_name": "node1,node2,node3",
"parallel": True
}
action = "restart_kubelet_scenario"
mock_scenario_object = Mock()
mock_scenario_object.affected_nodes_status = AffectedNodeStatus()
mock_scenario_object.affected_nodes_status.affected_nodes = []
mock_common_funcs.get_node_by_name.return_value = ["node1", "node2", "node3"]
with patch.object(self.plugin, 'multiprocess_nodes') as mock_multiprocess:
self.plugin.inject_node_scenario(
action,
node_scenario,
mock_scenario_object,
self.mock_kubecli,
self.mock_scenario_telemetry
)
mock_multiprocess.assert_called_once()
args = mock_multiprocess.call_args[0]
self.assertEqual(args[0], ["node1", "node2", "node3"])
self.assertEqual(args[2], action)
def test_run_node_node_start_scenario(self):
"""
Test run_node executes node_start_scenario action
"""
node_scenario = {"runs": 2, "timeout": 300, "poll_interval": 10}
action = "node_start_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_scenario_object.node_start_scenario.assert_called_once_with(2, "test-node", 300, 10)
def test_run_node_node_stop_scenario(self):
"""
Test run_node executes node_stop_scenario action
"""
node_scenario = {"runs": 1, "timeout": 120, "poll_interval": 15}
action = "node_stop_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_scenario_object.node_stop_scenario.assert_called_once_with(1, "test-node", 120, 15)
def test_run_node_node_stop_start_scenario(self):
"""
Test run_node executes node_stop_start_scenario action
"""
node_scenario = {"runs": 1, "timeout": 120, "duration": 60, "poll_interval": 15}
action = "node_stop_start_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_scenario_object.node_stop_start_scenario.assert_called_once_with(1, "test-node", 120, 60, 15)
def test_run_node_node_termination_scenario(self):
"""
Test run_node executes node_termination_scenario action
"""
node_scenario = {}
action = "node_termination_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_scenario_object.node_termination_scenario.assert_called_once_with(1, "test-node", 120, 15)
def test_run_node_node_reboot_scenario(self):
"""
Test run_node executes node_reboot_scenario action
"""
node_scenario = {"soft_reboot": True}
action = "node_reboot_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_scenario_object.node_reboot_scenario.assert_called_once_with(1, "test-node", 120, True)
def test_run_node_node_disk_detach_attach_scenario(self):
"""
Test run_node executes node_disk_detach_attach_scenario action
"""
node_scenario = {"duration": 90}
action = "node_disk_detach_attach_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_scenario_object.node_disk_detach_attach_scenario.assert_called_once_with(1, "test-node", 120, 90)
def test_run_node_stop_start_kubelet_scenario(self):
"""
Test run_node executes stop_start_kubelet_scenario action
"""
node_scenario = {}
action = "stop_start_kubelet_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_scenario_object.stop_start_kubelet_scenario.assert_called_once_with(1, "test-node", 120)
def test_run_node_restart_kubelet_scenario(self):
"""
Test run_node executes restart_kubelet_scenario action
"""
node_scenario = {}
action = "restart_kubelet_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_scenario_object.restart_kubelet_scenario.assert_called_once_with(1, "test-node", 120)
def test_run_node_stop_kubelet_scenario(self):
"""
Test run_node executes stop_kubelet_scenario action
"""
node_scenario = {}
action = "stop_kubelet_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_scenario_object.stop_kubelet_scenario.assert_called_once_with(1, "test-node", 120)
def test_run_node_node_crash_scenario(self):
"""
Test run_node executes node_crash_scenario action
"""
node_scenario = {}
action = "node_crash_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_scenario_object.node_crash_scenario.assert_called_once_with(1, "test-node", 120)
def test_run_node_node_block_scenario(self):
"""
Test run_node executes node_block_scenario action
"""
node_scenario = {"duration": 100}
action = "node_block_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_scenario_object.node_block_scenario.assert_called_once_with(1, "test-node", 120, 100)
@patch('logging.info')
def test_run_node_stop_start_helper_node_scenario_openstack(self, mock_logging):
"""
Test run_node executes stop_start_helper_node_scenario for OpenStack
"""
node_scenario = {
"cloud_type": "openstack",
"helper_node_ip": "192.168.1.100",
"service": "neutron-server"
}
action = "stop_start_helper_node_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_scenario_object.helper_node_stop_start_scenario.assert_called_once_with(1, "192.168.1.100", 120)
mock_scenario_object.helper_node_service_status.assert_called_once()
@patch('logging.error')
def test_run_node_stop_start_helper_node_scenario_non_openstack(self, mock_logging):
"""
Test run_node logs error for stop_start_helper_node_scenario on non-OpenStack
"""
node_scenario = {
"cloud_type": "aws",
"helper_node_ip": "192.168.1.100"
}
action = "stop_start_helper_node_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_logging.assert_called()
self.assertIn("not supported", str(mock_logging.call_args))
@patch('logging.error')
def test_run_node_stop_start_helper_node_scenario_missing_ip(self, mock_logging):
"""
Test run_node raises exception when helper_node_ip is missing
"""
node_scenario = {
"cloud_type": "openstack",
"helper_node_ip": None
}
action = "stop_start_helper_node_scenario"
mock_scenario_object = Mock()
with self.assertRaises(Exception) as context:
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
self.assertIn("Helper node IP address is not provided", str(context.exception))
@patch('logging.info')
def test_run_node_generic_cloud_skip_unsupported_action(self, mock_logging):
"""
Test run_node skips unsupported actions for generic cloud type
"""
# Set node_general to True for this test
import krkn.scenario_plugins.node_actions.node_actions_scenario_plugin as plugin_module
plugin_module.node_general = True
node_scenario = {}
action = "node_stop_scenario"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_logging.assert_called()
self.assertIn("not set up for generic cloud type", str(mock_logging.call_args))
@patch('logging.info')
def test_run_node_unknown_action(self, mock_logging):
"""
Test run_node logs info for unknown action
"""
node_scenario = {}
action = "unknown_action"
mock_scenario_object = Mock()
self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario)
mock_logging.assert_called()
# Could be either message depending on node_general state
call_str = str(mock_logging.call_args)
self.assertTrue(
"no node action that matches" in call_str or
"not set up for generic cloud type" in call_str
)
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.cerberus')
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.common_node_functions')
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.general_node_scenarios')
@patch('builtins.open', new_callable=mock_open)
@patch('time.time')
def test_run_successful(self, mock_time, mock_file, mock_general_scenarios, mock_common_funcs, mock_cerberus):
"""
Test successful run of node actions scenario
"""
scenario_yaml = {
"node_scenarios": [
{
"cloud_type": "generic",
"node_name": "test-node",
"actions": ["stop_kubelet_scenario"]
}
]
}
mock_file.return_value.__enter__.return_value.read.return_value = yaml.dump(scenario_yaml)
mock_time.side_effect = [1000, 1100]
mock_scenario_object = Mock()
mock_scenario_object.affected_nodes_status = AffectedNodeStatus()
mock_scenario_object.affected_nodes_status.affected_nodes = []
mock_general_scenarios.return_value = mock_scenario_object
mock_common_funcs.get_node_by_name.return_value = ["test-node"]
mock_cerberus.get_status.return_value = None
with patch('yaml.full_load', return_value=scenario_yaml):
result = self.plugin.run(
"test-uuid",
"/path/to/scenario.yaml",
{},
self.mock_lib_telemetry,
self.mock_scenario_telemetry
)
self.assertEqual(result, 0)
mock_cerberus.get_status.assert_called_once_with({}, 1000, 1100)
@patch('logging.error')
@patch('builtins.open', new_callable=mock_open)
def test_run_with_exception(self, mock_file, mock_logging):
"""
Test run handles exceptions and returns 1
"""
scenario_yaml = {
"node_scenarios": [
{
"cloud_type": "unsupported"
}
]
}
with patch('yaml.full_load', return_value=scenario_yaml):
result = self.plugin.run(
"test-uuid",
"/path/to/scenario.yaml",
{},
self.mock_lib_telemetry,
self.mock_scenario_telemetry
)
self.assertEqual(result, 1)
mock_logging.assert_called()
@patch('logging.info')
def test_multiprocess_nodes(self, mock_logging):
"""
Test multiprocess_nodes executes run_node for multiple nodes in parallel
"""
nodes = ["node1", "node2", "node3"]
mock_scenario_object = Mock()
action = "restart_kubelet_scenario"
node_scenario = {}
with patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.ThreadPool') as mock_pool:
mock_pool_instance = Mock()
mock_pool.return_value = mock_pool_instance
self.plugin.multiprocess_nodes(nodes, mock_scenario_object, action, node_scenario)
mock_pool.assert_called_once_with(processes=3)
mock_pool_instance.starmap.assert_called_once()
mock_pool_instance.close.assert_called_once()
@patch('logging.info')
def test_multiprocess_nodes_with_exception(self, mock_logging):
"""
Test multiprocess_nodes handles exceptions gracefully
"""
nodes = ["node1", "node2"]
mock_scenario_object = Mock()
action = "node_reboot_scenario"
node_scenario = {}
with patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.ThreadPool') as mock_pool:
mock_pool.side_effect = Exception("Pool error")
self.plugin.multiprocess_nodes(nodes, mock_scenario_object, action, node_scenario)
mock_logging.assert_called()
self.assertIn("Error on pool multiprocessing", str(mock_logging.call_args))
if __name__ == "__main__":
unittest.main()