From ff3c4f53136f44b27e13152eb33838e850d73ed6 Mon Sep 17 00:00:00 2001 From: Paige Patton <64206430+paigerube14@users.noreply.github.com> Date: Mon, 22 Dec 2025 11:36:10 -0500 Subject: [PATCH] increasing node action coverage (#1010) Signed-off-by: Paige Patton --- tests/test_abstract_node_scenarios.py | 415 ++++++++++++ tests/test_common_node_functions.py | 476 ++++++++++++++ tests/test_node_actions_scenario_plugin.py | 710 ++++++++++++++++++++- 3 files changed, 1600 insertions(+), 1 deletion(-) create mode 100644 tests/test_abstract_node_scenarios.py create mode 100644 tests/test_common_node_functions.py diff --git a/tests/test_abstract_node_scenarios.py b/tests/test_abstract_node_scenarios.py new file mode 100644 index 00000000..1821a7d5 --- /dev/null +++ b/tests/test_abstract_node_scenarios.py @@ -0,0 +1,415 @@ +""" +Test suite for AbstractNode Scenarios + +Usage: + python -m coverage run -a -m unittest tests/test_abstract_node_scenarios.py + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import Mock, patch +from krkn.scenario_plugins.node_actions.abstract_node_scenarios import abstract_node_scenarios +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus + + +class TestAbstractNodeScenarios(unittest.TestCase): + """Test suite for abstract_node_scenarios class""" + + def setUp(self): + """Set up test fixtures before each test method""" + self.mock_kubecli = Mock(spec=KrknKubernetes) + self.mock_affected_nodes_status = Mock(spec=AffectedNodeStatus) + self.mock_affected_nodes_status.affected_nodes = [] + self.node_action_kube_check = True + + self.scenarios = abstract_node_scenarios( + kubecli=self.mock_kubecli, + node_action_kube_check=self.node_action_kube_check, + affected_nodes_status=self.mock_affected_nodes_status + ) + + def test_init(self): + """Test initialization of abstract_node_scenarios""" + self.assertEqual(self.scenarios.kubecli, self.mock_kubecli) + self.assertEqual(self.scenarios.affected_nodes_status, self.mock_affected_nodes_status) + self.assertTrue(self.scenarios.node_action_kube_check) + + @patch('time.sleep') + @patch('logging.info') + def test_node_stop_start_scenario(self, mock_logging, mock_sleep): + """Test node_stop_start_scenario calls stop and start in sequence""" + # Arrange + instance_kill_count = 1 + node = "test-node" + timeout = 300 + duration = 60 + poll_interval = 10 + + self.scenarios.node_stop_scenario = Mock() + self.scenarios.node_start_scenario = Mock() + + # Act + self.scenarios.node_stop_start_scenario( + instance_kill_count, node, timeout, duration, poll_interval + ) + + # Assert + self.scenarios.node_stop_scenario.assert_called_once_with( + instance_kill_count, node, timeout, poll_interval + ) + mock_sleep.assert_called_once_with(duration) + self.scenarios.node_start_scenario.assert_called_once_with( + instance_kill_count, node, timeout, poll_interval + ) + self.mock_affected_nodes_status.merge_affected_nodes.assert_called_once() + + @patch('logging.info') + def test_helper_node_stop_start_scenario(self, mock_logging): + """Test helper_node_stop_start_scenario calls helper stop and start""" + # Arrange + instance_kill_count = 1 + node = "helper-node" + timeout = 300 + + self.scenarios.helper_node_stop_scenario = Mock() + self.scenarios.helper_node_start_scenario = Mock() + + # Act + self.scenarios.helper_node_stop_start_scenario(instance_kill_count, node, timeout) + + # Assert + self.scenarios.helper_node_stop_scenario.assert_called_once_with( + instance_kill_count, node, timeout + ) + self.scenarios.helper_node_start_scenario.assert_called_once_with( + instance_kill_count, node, timeout + ) + + @patch('time.sleep') + @patch('logging.info') + def test_node_disk_detach_attach_scenario_success(self, mock_logging, mock_sleep): + """Test disk detach/attach scenario with valid disk attachment""" + # Arrange + instance_kill_count = 1 + node = "test-node" + timeout = 300 + duration = 60 + disk_details = {"disk_id": "disk-123", "device": "/dev/sdb"} + + self.scenarios.get_disk_attachment_info = Mock(return_value=disk_details) + self.scenarios.disk_detach_scenario = Mock() + self.scenarios.disk_attach_scenario = Mock() + + # Act + self.scenarios.node_disk_detach_attach_scenario( + instance_kill_count, node, timeout, duration + ) + + # Assert + self.scenarios.get_disk_attachment_info.assert_called_once_with( + instance_kill_count, node + ) + self.scenarios.disk_detach_scenario.assert_called_once_with( + instance_kill_count, node, timeout + ) + mock_sleep.assert_called_once_with(duration) + self.scenarios.disk_attach_scenario.assert_called_once_with( + instance_kill_count, disk_details, timeout + ) + + @patch('logging.error') + @patch('logging.info') + def test_node_disk_detach_attach_scenario_no_disk(self, mock_info, mock_error): + """Test disk detach/attach scenario when only root disk exists""" + # Arrange + instance_kill_count = 1 + node = "test-node" + timeout = 300 + duration = 60 + + self.scenarios.get_disk_attachment_info = Mock(return_value=None) + self.scenarios.disk_detach_scenario = Mock() + self.scenarios.disk_attach_scenario = Mock() + + # Act + self.scenarios.node_disk_detach_attach_scenario( + instance_kill_count, node, timeout, duration + ) + + # Assert + self.scenarios.disk_detach_scenario.assert_not_called() + self.scenarios.disk_attach_scenario.assert_not_called() + mock_error.assert_any_call("Node %s has only root disk attached" % node) + + @patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_unknown_status') + @patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run') + @patch('logging.info') + def test_stop_kubelet_scenario_success(self, mock_logging, mock_run, mock_wait): + """Test successful kubelet stop scenario""" + # Arrange + instance_kill_count = 2 + node = "test-node" + timeout = 300 + mock_affected_node = Mock(spec=AffectedNode) + mock_wait.return_value = None + + # Act + with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode') as mock_affected_node_class: + mock_affected_node_class.return_value = mock_affected_node + self.scenarios.stop_kubelet_scenario(instance_kill_count, node, timeout) + + # Assert + self.assertEqual(mock_run.call_count, 2) + expected_command = "oc debug node/" + node + " -- chroot /host systemctl stop kubelet" + mock_run.assert_called_with(expected_command) + self.assertEqual(mock_wait.call_count, 2) + self.assertEqual(len(self.mock_affected_nodes_status.affected_nodes), 2) + + @patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_unknown_status') + @patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run') + @patch('logging.error') + @patch('logging.info') + def test_stop_kubelet_scenario_failure(self, mock_info, mock_error, mock_run, mock_wait): + """Test kubelet stop scenario when command fails""" + # Arrange + instance_kill_count = 1 + node = "test-node" + timeout = 300 + error_msg = "Command failed" + mock_run.side_effect = Exception(error_msg) + + # Act & Assert + with self.assertRaises(Exception): + with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode'): + self.scenarios.stop_kubelet_scenario(instance_kill_count, node, timeout) + + mock_error.assert_any_call( + "Failed to stop the kubelet of the node. Encountered following " + "exception: %s. Test Failed" % error_msg + ) + + @patch('logging.info') + def test_stop_start_kubelet_scenario(self, mock_logging): + """Test stop/start kubelet scenario""" + # Arrange + instance_kill_count = 1 + node = "test-node" + timeout = 300 + + self.scenarios.stop_kubelet_scenario = Mock() + self.scenarios.node_reboot_scenario = Mock() + + # Act + self.scenarios.stop_start_kubelet_scenario(instance_kill_count, node, timeout) + + # Assert + self.scenarios.stop_kubelet_scenario.assert_called_once_with( + instance_kill_count, node, timeout + ) + self.scenarios.node_reboot_scenario.assert_called_once_with( + instance_kill_count, node, timeout + ) + self.mock_affected_nodes_status.merge_affected_nodes.assert_called_once() + + @patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_ready_status') + @patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run') + @patch('logging.info') + def test_restart_kubelet_scenario_success(self, mock_logging, mock_run, mock_wait): + """Test successful kubelet restart scenario""" + # Arrange + instance_kill_count = 2 + node = "test-node" + timeout = 300 + mock_affected_node = Mock(spec=AffectedNode) + mock_wait.return_value = None + + # Act + with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode') as mock_affected_node_class: + mock_affected_node_class.return_value = mock_affected_node + self.scenarios.restart_kubelet_scenario(instance_kill_count, node, timeout) + + # Assert + self.assertEqual(mock_run.call_count, 2) + expected_command = "oc debug node/" + node + " -- chroot /host systemctl restart kubelet &" + mock_run.assert_called_with(expected_command) + self.assertEqual(mock_wait.call_count, 2) + self.assertEqual(len(self.mock_affected_nodes_status.affected_nodes), 2) + + @patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_ready_status') + @patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run') + @patch('logging.error') + @patch('logging.info') + def test_restart_kubelet_scenario_failure(self, mock_info, mock_error, mock_run, mock_wait): + """Test kubelet restart scenario when command fails""" + # Arrange + instance_kill_count = 1 + node = "test-node" + timeout = 300 + error_msg = "Restart failed" + mock_run.side_effect = Exception(error_msg) + + # Act & Assert + with self.assertRaises(Exception): + with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode'): + self.scenarios.restart_kubelet_scenario(instance_kill_count, node, timeout) + + mock_error.assert_any_call( + "Failed to restart the kubelet of the node. Encountered following " + "exception: %s. Test Failed" % error_msg + ) + + @patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run') + @patch('logging.info') + def test_node_crash_scenario_success(self, mock_logging, mock_run): + """Test successful node crash scenario""" + # Arrange + instance_kill_count = 2 + node = "test-node" + timeout = 300 + + # Act + result = self.scenarios.node_crash_scenario(instance_kill_count, node, timeout) + + # Assert + self.assertEqual(mock_run.call_count, 2) + expected_command = ( + "oc debug node/" + node + " -- chroot /host " + "dd if=/dev/urandom of=/proc/sysrq-trigger" + ) + mock_run.assert_called_with(expected_command) + self.assertIsNone(result) + + @patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run') + @patch('logging.error') + @patch('logging.info') + def test_node_crash_scenario_failure(self, mock_info, mock_error, mock_run): + """Test node crash scenario when command fails""" + # Arrange + instance_kill_count = 1 + node = "test-node" + timeout = 300 + error_msg = "Crash command failed" + mock_run.side_effect = Exception(error_msg) + + # Act + result = self.scenarios.node_crash_scenario(instance_kill_count, node, timeout) + + # Assert + self.assertEqual(result, 1) + mock_error.assert_any_call( + "Failed to crash the node. Encountered following exception: %s. " + "Test Failed" % error_msg + ) + + def test_node_start_scenario_not_implemented(self): + """Test that node_start_scenario returns None (not implemented)""" + result = self.scenarios.node_start_scenario(1, "test-node", 300, 10) + self.assertIsNone(result) + + def test_node_stop_scenario_not_implemented(self): + """Test that node_stop_scenario returns None (not implemented)""" + result = self.scenarios.node_stop_scenario(1, "test-node", 300, 10) + self.assertIsNone(result) + + def test_node_termination_scenario_not_implemented(self): + """Test that node_termination_scenario returns None (not implemented)""" + result = self.scenarios.node_termination_scenario(1, "test-node", 300, 10) + self.assertIsNone(result) + + def test_node_reboot_scenario_not_implemented(self): + """Test that node_reboot_scenario returns None (not implemented)""" + result = self.scenarios.node_reboot_scenario(1, "test-node", 300) + self.assertIsNone(result) + + def test_node_service_status_not_implemented(self): + """Test that node_service_status returns None (not implemented)""" + result = self.scenarios.node_service_status("test-node", "service", "key", 300) + self.assertIsNone(result) + + def test_node_block_scenario_not_implemented(self): + """Test that node_block_scenario returns None (not implemented)""" + result = self.scenarios.node_block_scenario(1, "test-node", 300, 60) + self.assertIsNone(result) + + +class TestAbstractNodeScenariosIntegration(unittest.TestCase): + """Integration tests for abstract_node_scenarios workflows""" + + def setUp(self): + """Set up test fixtures before each test method""" + self.mock_kubecli = Mock(spec=KrknKubernetes) + self.mock_affected_nodes_status = Mock(spec=AffectedNodeStatus) + self.mock_affected_nodes_status.affected_nodes = [] + + self.scenarios = abstract_node_scenarios( + kubecli=self.mock_kubecli, + node_action_kube_check=True, + affected_nodes_status=self.mock_affected_nodes_status + ) + + @patch('time.sleep') + @patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.nodeaction.wait_for_unknown_status') + @patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.runcommand.run') + def test_complete_stop_start_kubelet_workflow(self, mock_run, mock_wait, mock_sleep): + """Test complete workflow of stop/start kubelet scenario""" + # Arrange + instance_kill_count = 1 + node = "test-node" + timeout = 300 + + self.scenarios.node_reboot_scenario = Mock() + + # Act + with patch('krkn.scenario_plugins.node_actions.abstract_node_scenarios.AffectedNode'): + self.scenarios.stop_start_kubelet_scenario(instance_kill_count, node, timeout) + + # Assert - verify stop kubelet was called + expected_stop_command = "oc debug node/" + node + " -- chroot /host systemctl stop kubelet" + mock_run.assert_any_call(expected_stop_command) + + # Verify reboot was called + self.scenarios.node_reboot_scenario.assert_called_once_with( + instance_kill_count, node, timeout + ) + + # Verify merge was called + self.mock_affected_nodes_status.merge_affected_nodes.assert_called_once() + + @patch('time.sleep') + def test_node_stop_start_scenario_workflow(self, mock_sleep): + """Test complete workflow of node stop/start scenario""" + # Arrange + instance_kill_count = 1 + node = "test-node" + timeout = 300 + duration = 60 + poll_interval = 10 + + self.scenarios.node_stop_scenario = Mock() + self.scenarios.node_start_scenario = Mock() + + # Act + self.scenarios.node_stop_start_scenario( + instance_kill_count, node, timeout, duration, poll_interval + ) + + # Assert - verify order of operations + call_order = [] + + # Verify stop was called first + self.scenarios.node_stop_scenario.assert_called_once() + + # Verify sleep was called + mock_sleep.assert_called_once_with(duration) + + # Verify start was called after sleep + self.scenarios.node_start_scenario.assert_called_once() + + # Verify merge was called + self.mock_affected_nodes_status.merge_affected_nodes.assert_called_once() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_common_node_functions.py b/tests/test_common_node_functions.py new file mode 100644 index 00000000..17daa319 --- /dev/null +++ b/tests/test_common_node_functions.py @@ -0,0 +1,476 @@ +#!/usr/bin/env python3 + +""" +Test suite for common_node_functions module + +Usage: + python -m coverage run -a -m unittest tests/test_common_node_functions.py -v + +Assisted By: Claude Code +""" + +import unittest +from unittest.mock import MagicMock, Mock, patch, call +import logging + +from krkn_lib.k8s import KrknKubernetes +from krkn_lib.models.k8s import AffectedNode + +from krkn.scenario_plugins.node_actions import common_node_functions + + +class TestCommonNodeFunctions(unittest.TestCase): + + def setUp(self): + """ + Set up test fixtures before each test + """ + self.mock_kubecli = Mock(spec=KrknKubernetes) + self.mock_affected_node = Mock(spec=AffectedNode) + + def test_get_node_by_name_all_nodes_exist(self): + """ + Test get_node_by_name returns list when all nodes exist + """ + node_name_list = ["node1", "node2", "node3"] + self.mock_kubecli.list_killable_nodes.return_value = ["node1", "node2", "node3", "node4"] + + result = common_node_functions.get_node_by_name(node_name_list, self.mock_kubecli) + + self.assertEqual(result, node_name_list) + self.mock_kubecli.list_killable_nodes.assert_called_once() + + def test_get_node_by_name_single_node(self): + """ + Test get_node_by_name with single node + """ + node_name_list = ["worker-1"] + self.mock_kubecli.list_killable_nodes.return_value = ["worker-1", "worker-2"] + + result = common_node_functions.get_node_by_name(node_name_list, self.mock_kubecli) + + self.assertEqual(result, node_name_list) + + @patch('logging.info') + def test_get_node_by_name_node_not_exist(self, mock_logging): + """ + Test get_node_by_name returns None when node doesn't exist + """ + node_name_list = ["node1", "nonexistent-node"] + self.mock_kubecli.list_killable_nodes.return_value = ["node1", "node2", "node3"] + + result = common_node_functions.get_node_by_name(node_name_list, self.mock_kubecli) + + self.assertIsNone(result) + mock_logging.assert_called() + self.assertIn("does not exist", str(mock_logging.call_args)) + + @patch('logging.info') + def test_get_node_by_name_empty_killable_list(self, mock_logging): + """ + Test get_node_by_name when no killable nodes exist + """ + node_name_list = ["node1"] + self.mock_kubecli.list_killable_nodes.return_value = [] + + result = common_node_functions.get_node_by_name(node_name_list, self.mock_kubecli) + + self.assertIsNone(result) + mock_logging.assert_called() + + @patch('logging.info') + def test_get_node_single_label_selector(self, mock_logging): + """ + Test get_node with single label selector + """ + label_selector = "node-role.kubernetes.io/worker" + instance_kill_count = 2 + self.mock_kubecli.list_killable_nodes.return_value = ["worker-1", "worker-2", "worker-3"] + + result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli) + + self.assertEqual(len(result), 2) + self.assertTrue(all(node in ["worker-1", "worker-2", "worker-3"] for node in result)) + self.mock_kubecli.list_killable_nodes.assert_called_once_with(label_selector) + mock_logging.assert_called() + + @patch('logging.info') + def test_get_node_multiple_label_selectors(self, mock_logging): + """ + Test get_node with multiple comma-separated label selectors + """ + label_selector = "node-role.kubernetes.io/worker,topology.kubernetes.io/zone=us-east-1a" + instance_kill_count = 3 + self.mock_kubecli.list_killable_nodes.side_effect = [ + ["worker-1", "worker-2"], + ["worker-3", "worker-4"] + ] + + result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli) + + self.assertEqual(len(result), 3) + self.assertTrue(all(node in ["worker-1", "worker-2", "worker-3", "worker-4"] for node in result)) + self.assertEqual(self.mock_kubecli.list_killable_nodes.call_count, 2) + + @patch('logging.info') + def test_get_node_return_all_when_count_equals_total(self, mock_logging): + """ + Test get_node returns all nodes when instance_kill_count equals number of nodes + """ + label_selector = "node-role.kubernetes.io/worker" + nodes = ["worker-1", "worker-2", "worker-3"] + instance_kill_count = 3 + self.mock_kubecli.list_killable_nodes.return_value = nodes + + result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli) + + self.assertEqual(result, nodes) + + @patch('logging.info') + def test_get_node_return_all_when_count_is_zero(self, mock_logging): + """ + Test get_node returns all nodes when instance_kill_count is 0 + """ + label_selector = "node-role.kubernetes.io/worker" + nodes = ["worker-1", "worker-2", "worker-3"] + instance_kill_count = 0 + self.mock_kubecli.list_killable_nodes.return_value = nodes + + result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli) + + self.assertEqual(result, nodes) + + @patch('logging.info') + @patch('random.randint') + def test_get_node_random_selection(self, mock_randint, mock_logging): + """ + Test get_node randomly selects nodes when count is less than total + """ + label_selector = "node-role.kubernetes.io/worker" + instance_kill_count = 2 + self.mock_kubecli.list_killable_nodes.return_value = ["worker-1", "worker-2", "worker-3", "worker-4"] + # Mock random selection to return predictable values + mock_randint.side_effect = [1, 0] # Select index 1, then index 0 + + result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli) + + self.assertEqual(len(result), 2) + # Verify nodes were removed after selection to avoid duplicates + self.assertEqual(len(set(result)), 2) + + def test_get_node_no_nodes_with_label(self): + """ + Test get_node raises exception when no nodes match label selector + """ + label_selector = "nonexistent-label" + instance_kill_count = 1 + self.mock_kubecli.list_killable_nodes.return_value = [] + + with self.assertRaises(Exception) as context: + common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli) + + self.assertIn("Ready nodes with the provided label selector do not exist", str(context.exception)) + + def test_get_node_single_node_available(self): + """ + Test get_node when only one node is available + """ + label_selector = "node-role.kubernetes.io/master" + instance_kill_count = 1 + self.mock_kubecli.list_killable_nodes.return_value = ["master-1"] + + result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli) + + self.assertEqual(result, ["master-1"]) + + def test_wait_for_ready_status_without_affected_node(self): + """ + Test wait_for_ready_status without providing affected_node + """ + node = "test-node" + timeout = 300 + expected_affected_node = Mock(spec=AffectedNode) + self.mock_kubecli.watch_node_status.return_value = expected_affected_node + + result = common_node_functions.wait_for_ready_status(node, timeout, self.mock_kubecli) + + self.assertEqual(result, expected_affected_node) + self.mock_kubecli.watch_node_status.assert_called_once_with(node, "True", timeout, None) + + def test_wait_for_ready_status_with_affected_node(self): + """ + Test wait_for_ready_status with provided affected_node + """ + node = "test-node" + timeout = 300 + self.mock_kubecli.watch_node_status.return_value = self.mock_affected_node + + result = common_node_functions.wait_for_ready_status( + node, timeout, self.mock_kubecli, self.mock_affected_node + ) + + self.assertEqual(result, self.mock_affected_node) + self.mock_kubecli.watch_node_status.assert_called_once_with( + node, "True", timeout, self.mock_affected_node + ) + + def test_wait_for_not_ready_status_without_affected_node(self): + """ + Test wait_for_not_ready_status without providing affected_node + """ + node = "test-node" + timeout = 300 + expected_affected_node = Mock(spec=AffectedNode) + self.mock_kubecli.watch_node_status.return_value = expected_affected_node + + result = common_node_functions.wait_for_not_ready_status(node, timeout, self.mock_kubecli) + + self.assertEqual(result, expected_affected_node) + self.mock_kubecli.watch_node_status.assert_called_once_with(node, "False", timeout, None) + + def test_wait_for_not_ready_status_with_affected_node(self): + """ + Test wait_for_not_ready_status with provided affected_node + """ + node = "test-node" + timeout = 300 + self.mock_kubecli.watch_node_status.return_value = self.mock_affected_node + + result = common_node_functions.wait_for_not_ready_status( + node, timeout, self.mock_kubecli, self.mock_affected_node + ) + + self.assertEqual(result, self.mock_affected_node) + self.mock_kubecli.watch_node_status.assert_called_once_with( + node, "False", timeout, self.mock_affected_node + ) + + def test_wait_for_unknown_status_without_affected_node(self): + """ + Test wait_for_unknown_status without providing affected_node + """ + node = "test-node" + timeout = 300 + expected_affected_node = Mock(spec=AffectedNode) + self.mock_kubecli.watch_node_status.return_value = expected_affected_node + + result = common_node_functions.wait_for_unknown_status(node, timeout, self.mock_kubecli) + + self.assertEqual(result, expected_affected_node) + self.mock_kubecli.watch_node_status.assert_called_once_with(node, "Unknown", timeout, None) + + def test_wait_for_unknown_status_with_affected_node(self): + """ + Test wait_for_unknown_status with provided affected_node + """ + node = "test-node" + timeout = 300 + self.mock_kubecli.watch_node_status.return_value = self.mock_affected_node + + result = common_node_functions.wait_for_unknown_status( + node, timeout, self.mock_kubecli, self.mock_affected_node + ) + + self.assertEqual(result, self.mock_affected_node) + self.mock_kubecli.watch_node_status.assert_called_once_with( + node, "Unknown", timeout, self.mock_affected_node + ) + + @patch('time.sleep') + @patch('logging.info') + @patch('krkn.scenario_plugins.node_actions.common_node_functions.paramiko.SSHClient') + def test_check_service_status_success(self, mock_ssh_client, mock_logging, mock_sleep): + """ + Test check_service_status successfully checks service status + """ + node = "192.168.1.100" + service = ["neutron-server", "nova-compute"] + ssh_private_key = "~/.ssh/id_rsa" + timeout = 60 + + # Mock SSH client + mock_ssh = Mock() + mock_ssh_client.return_value = mock_ssh + mock_ssh.connect.return_value = None + + # Mock exec_command to return active status + mock_stdout = Mock() + mock_stdout.readlines.return_value = ["active\n"] + mock_ssh.exec_command.return_value = (Mock(), mock_stdout, Mock()) + + common_node_functions.check_service_status(node, service, ssh_private_key, timeout) + + # Verify SSH connection was attempted + mock_ssh.connect.assert_called() + # Verify service status was checked for each service + self.assertEqual(mock_ssh.exec_command.call_count, 2) + # Verify SSH connection was closed + mock_ssh.close.assert_called_once() + + @patch('time.sleep') + @patch('logging.error') + @patch('logging.info') + @patch('krkn.scenario_plugins.node_actions.common_node_functions.paramiko.SSHClient') + def test_check_service_status_service_inactive(self, mock_ssh_client, mock_logging_info, mock_logging_error, mock_sleep): + """ + Test check_service_status logs error when service is inactive + """ + node = "192.168.1.100" + service = ["neutron-server"] + ssh_private_key = "~/.ssh/id_rsa" + timeout = 60 + + # Mock SSH client + mock_ssh = Mock() + mock_ssh_client.return_value = mock_ssh + mock_ssh.connect.return_value = None + + # Mock exec_command to return inactive status + mock_stdout = Mock() + mock_stdout.readlines.return_value = ["inactive\n"] + mock_ssh.exec_command.return_value = (Mock(), mock_stdout, Mock()) + + common_node_functions.check_service_status(node, service, ssh_private_key, timeout) + + # Verify error was logged for inactive service + mock_logging_error.assert_called() + error_call_str = str(mock_logging_error.call_args) + self.assertIn("inactive", error_call_str) + mock_ssh.close.assert_called_once() + + @patch('time.sleep') + @patch('logging.error') + @patch('logging.info') + @patch('krkn.scenario_plugins.node_actions.common_node_functions.paramiko.SSHClient') + def test_check_service_status_ssh_connection_fails(self, mock_ssh_client, mock_logging_info, mock_logging_error, mock_sleep): + """ + Test check_service_status handles SSH connection failures + """ + node = "192.168.1.100" + service = ["neutron-server"] + ssh_private_key = "~/.ssh/id_rsa" + timeout = 5 + + # Mock SSH client to raise exception + mock_ssh = Mock() + mock_ssh_client.return_value = mock_ssh + mock_ssh.connect.side_effect = Exception("Connection timeout") + + # Mock exec_command for when connection eventually works (or doesn't) + mock_stdout = Mock() + mock_stdout.readlines.return_value = ["active\n"] + mock_ssh.exec_command.return_value = (Mock(), mock_stdout, Mock()) + + common_node_functions.check_service_status(node, service, ssh_private_key, timeout) + + # Verify error was logged for SSH connection failure + mock_logging_error.assert_called() + error_call_str = str(mock_logging_error.call_args) + self.assertIn("Failed to ssh", error_call_str) + + @patch('time.sleep') + @patch('logging.info') + @patch('krkn.scenario_plugins.node_actions.common_node_functions.paramiko.SSHClient') + def test_check_service_status_multiple_services(self, mock_ssh_client, mock_logging, mock_sleep): + """ + Test check_service_status with multiple services + """ + node = "192.168.1.100" + service = ["service1", "service2", "service3"] + ssh_private_key = "~/.ssh/id_rsa" + timeout = 60 + + # Mock SSH client + mock_ssh = Mock() + mock_ssh_client.return_value = mock_ssh + mock_ssh.connect.return_value = None + + # Mock exec_command to return active status + mock_stdout = Mock() + mock_stdout.readlines.return_value = ["active\n"] + mock_ssh.exec_command.return_value = (Mock(), mock_stdout, Mock()) + + common_node_functions.check_service_status(node, service, ssh_private_key, timeout) + + # Verify service status was checked for all services + self.assertEqual(mock_ssh.exec_command.call_count, 3) + mock_ssh.close.assert_called_once() + + @patch('time.sleep') + @patch('logging.info') + @patch('krkn.scenario_plugins.node_actions.common_node_functions.paramiko.SSHClient') + def test_check_service_status_retry_logic(self, mock_ssh_client, mock_logging, mock_sleep): + """ + Test check_service_status retry logic on connection failure then success + """ + node = "192.168.1.100" + service = ["neutron-server"] + ssh_private_key = "~/.ssh/id_rsa" + timeout = 10 + + # Mock SSH client + mock_ssh = Mock() + mock_ssh_client.return_value = mock_ssh + # First two attempts fail, third succeeds + mock_ssh.connect.side_effect = [ + Exception("Timeout"), + Exception("Timeout"), + None # Success + ] + + # Mock exec_command + mock_stdout = Mock() + mock_stdout.readlines.return_value = ["active\n"] + mock_ssh.exec_command.return_value = (Mock(), mock_stdout, Mock()) + + common_node_functions.check_service_status(node, service, ssh_private_key, timeout) + + # Verify multiple connection attempts were made + self.assertGreater(mock_ssh.connect.call_count, 1) + # Verify service was eventually checked + mock_ssh.exec_command.assert_called() + mock_ssh.close.assert_called_once() + + +class TestCommonNodeFunctionsIntegration(unittest.TestCase): + """Integration-style tests for common_node_functions""" + + def setUp(self): + """Set up test fixtures""" + self.mock_kubecli = Mock(spec=KrknKubernetes) + + @patch('logging.info') + def test_get_node_workflow_with_label_filtering(self, mock_logging): + """ + Test complete workflow of getting nodes with label selector and filtering + """ + label_selector = "node-role.kubernetes.io/worker" + instance_kill_count = 2 + available_nodes = ["worker-1", "worker-2", "worker-3", "worker-4", "worker-5"] + self.mock_kubecli.list_killable_nodes.return_value = available_nodes + + result = common_node_functions.get_node(label_selector, instance_kill_count, self.mock_kubecli) + + self.assertEqual(len(result), 2) + # Verify no duplicates + self.assertEqual(len(result), len(set(result))) + # Verify all nodes are from the available list + self.assertTrue(all(node in available_nodes for node in result)) + + @patch('logging.info') + def test_get_node_by_name_validation_workflow(self, mock_logging): + """ + Test complete workflow of validating node names + """ + requested_nodes = ["node-a", "node-b"] + killable_nodes = ["node-a", "node-b", "node-c", "node-d"] + self.mock_kubecli.list_killable_nodes.return_value = killable_nodes + + result = common_node_functions.get_node_by_name(requested_nodes, self.mock_kubecli) + + self.assertEqual(result, requested_nodes) + self.mock_kubecli.list_killable_nodes.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_node_actions_scenario_plugin.py b/tests/test_node_actions_scenario_plugin.py index 2db95d50..0fab5dcf 100644 --- a/tests/test_node_actions_scenario_plugin.py +++ b/tests/test_node_actions_scenario_plugin.py @@ -10,10 +10,15 @@ Assisted By: Claude Code """ import unittest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, Mock, patch, mock_open, call +import yaml +import tempfile +import os from krkn_lib.k8s import KrknKubernetes from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift +from krkn_lib.models.telemetry import ScenarioTelemetry +from krkn_lib.models.k8s import AffectedNodeStatus from krkn.scenario_plugins.node_actions.node_actions_scenario_plugin import NodeActionsScenarioPlugin @@ -24,7 +29,16 @@ class TestNodeActionsScenarioPlugin(unittest.TestCase): """ Set up test fixtures for NodeActionsScenarioPlugin """ + # Reset node_general global variable before each test + import krkn.scenario_plugins.node_actions.node_actions_scenario_plugin as plugin_module + plugin_module.node_general = False + self.plugin = NodeActionsScenarioPlugin() + self.mock_kubecli = Mock(spec=KrknKubernetes) + self.mock_lib_telemetry = Mock(spec=KrknTelemetryOpenshift) + self.mock_lib_telemetry.get_lib_kubernetes.return_value = self.mock_kubecli + self.mock_scenario_telemetry = Mock(spec=ScenarioTelemetry) + self.mock_scenario_telemetry.affected_nodes = [] def test_get_scenario_types(self): """ @@ -35,6 +49,700 @@ class TestNodeActionsScenarioPlugin(unittest.TestCase): self.assertEqual(result, ["node_scenarios"]) self.assertEqual(len(result), 1) + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.general_node_scenarios') + def test_get_node_scenario_object_generic(self, mock_general_scenarios): + """ + Test get_node_scenario_object returns general_node_scenarios for generic cloud type + """ + node_scenario = {"cloud_type": "generic"} + mock_general_instance = Mock() + mock_general_scenarios.return_value = mock_general_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_general_instance) + mock_general_scenarios.assert_called_once() + args = mock_general_scenarios.call_args[0] + self.assertEqual(args[0], self.mock_kubecli) + self.assertTrue(args[1]) # node_action_kube_check defaults to True + self.assertIsInstance(args[2], AffectedNodeStatus) + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.general_node_scenarios') + def test_get_node_scenario_object_no_cloud_type(self, mock_general_scenarios): + """ + Test get_node_scenario_object returns general_node_scenarios when cloud_type is not specified + """ + node_scenario = {} + mock_general_instance = Mock() + mock_general_scenarios.return_value = mock_general_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_general_instance) + mock_general_scenarios.assert_called_once() + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.aws_node_scenarios') + def test_get_node_scenario_object_aws(self, mock_aws_scenarios): + """ + Test get_node_scenario_object returns aws_node_scenarios for AWS cloud type + """ + node_scenario = {"cloud_type": "aws"} + mock_aws_instance = Mock() + mock_aws_scenarios.return_value = mock_aws_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_aws_instance) + mock_aws_scenarios.assert_called_once() + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.gcp_node_scenarios') + def test_get_node_scenario_object_gcp(self, mock_gcp_scenarios): + """ + Test get_node_scenario_object returns gcp_node_scenarios for GCP cloud type + """ + node_scenario = {"cloud_type": "gcp"} + mock_gcp_instance = Mock() + mock_gcp_scenarios.return_value = mock_gcp_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_gcp_instance) + mock_gcp_scenarios.assert_called_once() + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.azure_node_scenarios') + def test_get_node_scenario_object_azure(self, mock_azure_scenarios): + """ + Test get_node_scenario_object returns azure_node_scenarios for Azure cloud type + """ + node_scenario = {"cloud_type": "azure"} + mock_azure_instance = Mock() + mock_azure_scenarios.return_value = mock_azure_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_azure_instance) + mock_azure_scenarios.assert_called_once() + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.azure_node_scenarios') + def test_get_node_scenario_object_az(self, mock_azure_scenarios): + """ + Test get_node_scenario_object returns azure_node_scenarios for 'az' cloud type alias + """ + node_scenario = {"cloud_type": "az"} + mock_azure_instance = Mock() + mock_azure_scenarios.return_value = mock_azure_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_azure_instance) + mock_azure_scenarios.assert_called_once() + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.docker_node_scenarios') + def test_get_node_scenario_object_docker(self, mock_docker_scenarios): + """ + Test get_node_scenario_object returns docker_node_scenarios for Docker cloud type + """ + node_scenario = {"cloud_type": "docker"} + mock_docker_instance = Mock() + mock_docker_scenarios.return_value = mock_docker_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_docker_instance) + mock_docker_scenarios.assert_called_once() + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.vmware_node_scenarios') + def test_get_node_scenario_object_vmware(self, mock_vmware_scenarios): + """ + Test get_node_scenario_object returns vmware_node_scenarios for VMware cloud type + """ + node_scenario = {"cloud_type": "vmware"} + mock_vmware_instance = Mock() + mock_vmware_scenarios.return_value = mock_vmware_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_vmware_instance) + mock_vmware_scenarios.assert_called_once() + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.vmware_node_scenarios') + def test_get_node_scenario_object_vsphere(self, mock_vmware_scenarios): + """ + Test get_node_scenario_object returns vmware_node_scenarios for vSphere cloud type alias + """ + node_scenario = {"cloud_type": "vsphere"} + mock_vmware_instance = Mock() + mock_vmware_scenarios.return_value = mock_vmware_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_vmware_instance) + mock_vmware_scenarios.assert_called_once() + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.ibm_node_scenarios') + def test_get_node_scenario_object_ibm(self, mock_ibm_scenarios): + """ + Test get_node_scenario_object returns ibm_node_scenarios for IBM cloud type + """ + node_scenario = {"cloud_type": "ibm"} + mock_ibm_instance = Mock() + mock_ibm_scenarios.return_value = mock_ibm_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_ibm_instance) + mock_ibm_scenarios.assert_called_once() + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.ibm_node_scenarios') + def test_get_node_scenario_object_ibmcloud(self, mock_ibm_scenarios): + """ + Test get_node_scenario_object returns ibm_node_scenarios for ibmcloud cloud type alias + """ + node_scenario = {"cloud_type": "ibmcloud", "disable_ssl_verification": False} + mock_ibm_instance = Mock() + mock_ibm_scenarios.return_value = mock_ibm_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_ibm_instance) + args = mock_ibm_scenarios.call_args[0] + self.assertFalse(args[3]) # disable_ssl_verification should be False + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.ibmcloud_power_node_scenarios') + def test_get_node_scenario_object_ibmpower(self, mock_ibmpower_scenarios): + """ + Test get_node_scenario_object returns ibmcloud_power_node_scenarios for ibmpower cloud type + """ + node_scenario = {"cloud_type": "ibmpower"} + mock_ibmpower_instance = Mock() + mock_ibmpower_scenarios.return_value = mock_ibmpower_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_ibmpower_instance) + mock_ibmpower_scenarios.assert_called_once() + + def test_get_node_scenario_object_openstack(self): + """ + Test get_node_scenario_object returns openstack_node_scenarios for OpenStack cloud type + """ + with patch('krkn.scenario_plugins.node_actions.openstack_node_scenarios.openstack_node_scenarios') as mock_openstack: + node_scenario = {"cloud_type": "openstack"} + mock_openstack_instance = Mock() + mock_openstack.return_value = mock_openstack_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_openstack_instance) + mock_openstack.assert_called_once() + + def test_get_node_scenario_object_alibaba(self): + """ + Test get_node_scenario_object returns alibaba_node_scenarios for Alibaba cloud type + """ + with patch('krkn.scenario_plugins.node_actions.alibaba_node_scenarios.alibaba_node_scenarios') as mock_alibaba: + node_scenario = {"cloud_type": "alibaba"} + mock_alibaba_instance = Mock() + mock_alibaba.return_value = mock_alibaba_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_alibaba_instance) + mock_alibaba.assert_called_once() + + def test_get_node_scenario_object_alicloud(self): + """ + Test get_node_scenario_object returns alibaba_node_scenarios for alicloud alias + """ + with patch('krkn.scenario_plugins.node_actions.alibaba_node_scenarios.alibaba_node_scenarios') as mock_alibaba: + node_scenario = {"cloud_type": "alicloud"} + mock_alibaba_instance = Mock() + mock_alibaba.return_value = mock_alibaba_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_alibaba_instance) + mock_alibaba.assert_called_once() + + def test_get_node_scenario_object_bm(self): + """ + Test get_node_scenario_object returns bm_node_scenarios for bare metal cloud type + """ + with patch('krkn.scenario_plugins.node_actions.bm_node_scenarios.bm_node_scenarios') as mock_bm: + node_scenario = { + "cloud_type": "bm", + "bmc_info": "192.168.1.1", + "bmc_user": "admin", + "bmc_password": "password" + } + mock_bm_instance = Mock() + mock_bm.return_value = mock_bm_instance + + result = self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertEqual(result, mock_bm_instance) + args = mock_bm.call_args[0] + self.assertEqual(args[0], "192.168.1.1") + self.assertEqual(args[1], "admin") + self.assertEqual(args[2], "password") + + def test_get_node_scenario_object_unsupported_cloud(self): + """ + Test get_node_scenario_object raises exception for unsupported cloud type + """ + node_scenario = {"cloud_type": "unsupported_cloud"} + + with self.assertRaises(Exception) as context: + self.plugin.get_node_scenario_object(node_scenario, self.mock_kubecli) + + self.assertIn("not currently supported", str(context.exception)) + self.assertIn("unsupported_cloud", str(context.exception)) + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.common_node_functions') + def test_inject_node_scenario_with_node_name(self, mock_common_funcs): + """ + Test inject_node_scenario with specific node name + """ + node_scenario = { + "node_name": "node1,node2", + "instance_count": 2, + "runs": 1, + "timeout": 120, + "duration": 60, + "poll_interval": 15 + } + action = "node_stop_start_scenario" + mock_scenario_object = Mock() + mock_scenario_object.affected_nodes_status = AffectedNodeStatus() + mock_scenario_object.affected_nodes_status.affected_nodes = [] + + mock_common_funcs.get_node_by_name.return_value = ["node1", "node2"] + + self.plugin.inject_node_scenario( + action, + node_scenario, + mock_scenario_object, + self.mock_kubecli, + self.mock_scenario_telemetry + ) + + mock_common_funcs.get_node_by_name.assert_called_once_with(["node1", "node2"], self.mock_kubecli) + self.assertEqual(mock_scenario_object.node_stop_start_scenario.call_count, 2) + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.common_node_functions') + def test_inject_node_scenario_with_label_selector(self, mock_common_funcs): + """ + Test inject_node_scenario with label selector + """ + node_scenario = { + "label_selector": "node-role.kubernetes.io/worker", + "instance_count": 1 + } + action = "node_reboot_scenario" + mock_scenario_object = Mock() + mock_scenario_object.affected_nodes_status = AffectedNodeStatus() + mock_scenario_object.affected_nodes_status.affected_nodes = [] + + mock_common_funcs.get_node.return_value = ["worker-node-1"] + + self.plugin.inject_node_scenario( + action, + node_scenario, + mock_scenario_object, + self.mock_kubecli, + self.mock_scenario_telemetry + ) + + mock_common_funcs.get_node.assert_called_once_with("node-role.kubernetes.io/worker", 1, self.mock_kubecli) + mock_scenario_object.node_reboot_scenario.assert_called_once() + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.common_node_functions') + def test_inject_node_scenario_with_exclude_label(self, mock_common_funcs): + """ + Test inject_node_scenario with exclude label + """ + node_scenario = { + "label_selector": "node-role.kubernetes.io/worker", + "exclude_label": "node-role.kubernetes.io/master", + "instance_count": 2 + } + action = "node_stop_scenario" + mock_scenario_object = Mock() + mock_scenario_object.affected_nodes_status = AffectedNodeStatus() + mock_scenario_object.affected_nodes_status.affected_nodes = [] + + mock_common_funcs.get_node.side_effect = [ + ["worker-1", "master-1"], + ["master-1"] + ] + + self.plugin.inject_node_scenario( + action, + node_scenario, + mock_scenario_object, + self.mock_kubecli, + self.mock_scenario_telemetry + ) + + self.assertEqual(mock_common_funcs.get_node.call_count, 2) + # Should only process worker-1 after excluding master-1 + self.assertEqual(mock_scenario_object.node_stop_scenario.call_count, 1) + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.common_node_functions') + def test_inject_node_scenario_parallel_mode(self, mock_common_funcs): + """ + Test inject_node_scenario with parallel processing + """ + node_scenario = { + "node_name": "node1,node2,node3", + "parallel": True + } + action = "restart_kubelet_scenario" + mock_scenario_object = Mock() + mock_scenario_object.affected_nodes_status = AffectedNodeStatus() + mock_scenario_object.affected_nodes_status.affected_nodes = [] + + mock_common_funcs.get_node_by_name.return_value = ["node1", "node2", "node3"] + + with patch.object(self.plugin, 'multiprocess_nodes') as mock_multiprocess: + self.plugin.inject_node_scenario( + action, + node_scenario, + mock_scenario_object, + self.mock_kubecli, + self.mock_scenario_telemetry + ) + + mock_multiprocess.assert_called_once() + args = mock_multiprocess.call_args[0] + self.assertEqual(args[0], ["node1", "node2", "node3"]) + self.assertEqual(args[2], action) + + def test_run_node_node_start_scenario(self): + """ + Test run_node executes node_start_scenario action + """ + node_scenario = {"runs": 2, "timeout": 300, "poll_interval": 10} + action = "node_start_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_scenario_object.node_start_scenario.assert_called_once_with(2, "test-node", 300, 10) + + def test_run_node_node_stop_scenario(self): + """ + Test run_node executes node_stop_scenario action + """ + node_scenario = {"runs": 1, "timeout": 120, "poll_interval": 15} + action = "node_stop_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_scenario_object.node_stop_scenario.assert_called_once_with(1, "test-node", 120, 15) + + def test_run_node_node_stop_start_scenario(self): + """ + Test run_node executes node_stop_start_scenario action + """ + node_scenario = {"runs": 1, "timeout": 120, "duration": 60, "poll_interval": 15} + action = "node_stop_start_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_scenario_object.node_stop_start_scenario.assert_called_once_with(1, "test-node", 120, 60, 15) + + def test_run_node_node_termination_scenario(self): + """ + Test run_node executes node_termination_scenario action + """ + node_scenario = {} + action = "node_termination_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_scenario_object.node_termination_scenario.assert_called_once_with(1, "test-node", 120, 15) + + def test_run_node_node_reboot_scenario(self): + """ + Test run_node executes node_reboot_scenario action + """ + node_scenario = {"soft_reboot": True} + action = "node_reboot_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_scenario_object.node_reboot_scenario.assert_called_once_with(1, "test-node", 120, True) + + def test_run_node_node_disk_detach_attach_scenario(self): + """ + Test run_node executes node_disk_detach_attach_scenario action + """ + node_scenario = {"duration": 90} + action = "node_disk_detach_attach_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_scenario_object.node_disk_detach_attach_scenario.assert_called_once_with(1, "test-node", 120, 90) + + def test_run_node_stop_start_kubelet_scenario(self): + """ + Test run_node executes stop_start_kubelet_scenario action + """ + node_scenario = {} + action = "stop_start_kubelet_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_scenario_object.stop_start_kubelet_scenario.assert_called_once_with(1, "test-node", 120) + + def test_run_node_restart_kubelet_scenario(self): + """ + Test run_node executes restart_kubelet_scenario action + """ + node_scenario = {} + action = "restart_kubelet_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_scenario_object.restart_kubelet_scenario.assert_called_once_with(1, "test-node", 120) + + def test_run_node_stop_kubelet_scenario(self): + """ + Test run_node executes stop_kubelet_scenario action + """ + node_scenario = {} + action = "stop_kubelet_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_scenario_object.stop_kubelet_scenario.assert_called_once_with(1, "test-node", 120) + + def test_run_node_node_crash_scenario(self): + """ + Test run_node executes node_crash_scenario action + """ + node_scenario = {} + action = "node_crash_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_scenario_object.node_crash_scenario.assert_called_once_with(1, "test-node", 120) + + def test_run_node_node_block_scenario(self): + """ + Test run_node executes node_block_scenario action + """ + node_scenario = {"duration": 100} + action = "node_block_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_scenario_object.node_block_scenario.assert_called_once_with(1, "test-node", 120, 100) + + @patch('logging.info') + def test_run_node_stop_start_helper_node_scenario_openstack(self, mock_logging): + """ + Test run_node executes stop_start_helper_node_scenario for OpenStack + """ + node_scenario = { + "cloud_type": "openstack", + "helper_node_ip": "192.168.1.100", + "service": "neutron-server" + } + action = "stop_start_helper_node_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_scenario_object.helper_node_stop_start_scenario.assert_called_once_with(1, "192.168.1.100", 120) + mock_scenario_object.helper_node_service_status.assert_called_once() + + @patch('logging.error') + def test_run_node_stop_start_helper_node_scenario_non_openstack(self, mock_logging): + """ + Test run_node logs error for stop_start_helper_node_scenario on non-OpenStack + """ + node_scenario = { + "cloud_type": "aws", + "helper_node_ip": "192.168.1.100" + } + action = "stop_start_helper_node_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_logging.assert_called() + self.assertIn("not supported", str(mock_logging.call_args)) + + @patch('logging.error') + def test_run_node_stop_start_helper_node_scenario_missing_ip(self, mock_logging): + """ + Test run_node raises exception when helper_node_ip is missing + """ + node_scenario = { + "cloud_type": "openstack", + "helper_node_ip": None + } + action = "stop_start_helper_node_scenario" + mock_scenario_object = Mock() + + with self.assertRaises(Exception) as context: + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + self.assertIn("Helper node IP address is not provided", str(context.exception)) + + @patch('logging.info') + def test_run_node_generic_cloud_skip_unsupported_action(self, mock_logging): + """ + Test run_node skips unsupported actions for generic cloud type + """ + # Set node_general to True for this test + import krkn.scenario_plugins.node_actions.node_actions_scenario_plugin as plugin_module + plugin_module.node_general = True + + node_scenario = {} + action = "node_stop_scenario" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_logging.assert_called() + self.assertIn("not set up for generic cloud type", str(mock_logging.call_args)) + + @patch('logging.info') + def test_run_node_unknown_action(self, mock_logging): + """ + Test run_node logs info for unknown action + """ + node_scenario = {} + action = "unknown_action" + mock_scenario_object = Mock() + + self.plugin.run_node("test-node", mock_scenario_object, action, node_scenario) + + mock_logging.assert_called() + # Could be either message depending on node_general state + call_str = str(mock_logging.call_args) + self.assertTrue( + "no node action that matches" in call_str or + "not set up for generic cloud type" in call_str + ) + + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.cerberus') + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.common_node_functions') + @patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.general_node_scenarios') + @patch('builtins.open', new_callable=mock_open) + @patch('time.time') + def test_run_successful(self, mock_time, mock_file, mock_general_scenarios, mock_common_funcs, mock_cerberus): + """ + Test successful run of node actions scenario + """ + scenario_yaml = { + "node_scenarios": [ + { + "cloud_type": "generic", + "node_name": "test-node", + "actions": ["stop_kubelet_scenario"] + } + ] + } + + mock_file.return_value.__enter__.return_value.read.return_value = yaml.dump(scenario_yaml) + mock_time.side_effect = [1000, 1100] + mock_scenario_object = Mock() + mock_scenario_object.affected_nodes_status = AffectedNodeStatus() + mock_scenario_object.affected_nodes_status.affected_nodes = [] + mock_general_scenarios.return_value = mock_scenario_object + mock_common_funcs.get_node_by_name.return_value = ["test-node"] + mock_cerberus.get_status.return_value = None + + with patch('yaml.full_load', return_value=scenario_yaml): + result = self.plugin.run( + "test-uuid", + "/path/to/scenario.yaml", + {}, + self.mock_lib_telemetry, + self.mock_scenario_telemetry + ) + + self.assertEqual(result, 0) + mock_cerberus.get_status.assert_called_once_with({}, 1000, 1100) + + @patch('logging.error') + @patch('builtins.open', new_callable=mock_open) + def test_run_with_exception(self, mock_file, mock_logging): + """ + Test run handles exceptions and returns 1 + """ + scenario_yaml = { + "node_scenarios": [ + { + "cloud_type": "unsupported" + } + ] + } + + with patch('yaml.full_load', return_value=scenario_yaml): + result = self.plugin.run( + "test-uuid", + "/path/to/scenario.yaml", + {}, + self.mock_lib_telemetry, + self.mock_scenario_telemetry + ) + + self.assertEqual(result, 1) + mock_logging.assert_called() + + @patch('logging.info') + def test_multiprocess_nodes(self, mock_logging): + """ + Test multiprocess_nodes executes run_node for multiple nodes in parallel + """ + nodes = ["node1", "node2", "node3"] + mock_scenario_object = Mock() + action = "restart_kubelet_scenario" + node_scenario = {} + + with patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.ThreadPool') as mock_pool: + mock_pool_instance = Mock() + mock_pool.return_value = mock_pool_instance + + self.plugin.multiprocess_nodes(nodes, mock_scenario_object, action, node_scenario) + + mock_pool.assert_called_once_with(processes=3) + mock_pool_instance.starmap.assert_called_once() + mock_pool_instance.close.assert_called_once() + + @patch('logging.info') + def test_multiprocess_nodes_with_exception(self, mock_logging): + """ + Test multiprocess_nodes handles exceptions gracefully + """ + nodes = ["node1", "node2"] + mock_scenario_object = Mock() + action = "node_reboot_scenario" + node_scenario = {} + + with patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.ThreadPool') as mock_pool: + mock_pool.side_effect = Exception("Pool error") + + self.plugin.multiprocess_nodes(nodes, mock_scenario_object, action, node_scenario) + + mock_logging.assert_called() + self.assertIn("Error on pool multiprocessing", str(mock_logging.call_args)) + if __name__ == "__main__": unittest.main()