diff --git a/config/config.yaml b/config/config.yaml index feb1f8cd..4ee729a7 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -14,6 +14,7 @@ kraken: - plugin_scenarios: - scenarios/openshift/etcd.yml - scenarios/openshift/regex_openshift_pod_kill.yml + - scenarios/openshift/vmware_node_scenarios.yml - node_scenarios: # List of chaos node scenarios to load - scenarios/openshift/node_scenarios_example.yml - plugin_scenarios: diff --git a/docs/cloud_setup.md b/docs/cloud_setup.md index c6ed1491..17f4d7bc 100644 --- a/docs/cloud_setup.md +++ b/docs/cloud_setup.md @@ -5,6 +5,7 @@ Supported Cloud Providers: * [Openstack](#openstack) * [Azure](#azure) * [Alibaba](#alibaba) +* [VMware](#vmware) ## AWS @@ -53,3 +54,15 @@ See the [Installation guide](https://www.alibabacloud.com/help/en/alibaba-cloud- Refer to [region and zone page](https://www.alibabacloud.com/help/en/elastic-compute-service/latest/regions-and-zones#concept-2459516) to get the region id for the region you are running on. Set cloud_type to either alibaba or alicloud in your node scenario yaml file. + +## VMware + +Set the following environment variables + +1. ```export VSPHERE_IP=``` + +2. ```export VSPHERE_USERNAME=``` + +3. ```export VSPHERE_PASSWORD=``` + +These are the credentials that you would normally use to access the vSphere client. \ No newline at end of file diff --git a/docs/node_scenarios.md b/docs/node_scenarios.md index 9ef1b066..88414c79 100644 --- a/docs/node_scenarios.md +++ b/docs/node_scenarios.md @@ -64,6 +64,10 @@ How to set up Alibaba cli to run node scenarios is defined [here](cloud_setup.md . Releasing a node is 2 steps, stopping the node and then releasing it. +#### VMware +How to set up VMware vSphere to run node scenarios is defined [here](cloud_setup.md#vmware) + + #### General **NOTE**: The `node_crash_scenario` and `stop_kubelet_scenario` scenario is supported independent of the cloud platform. diff --git a/kraken/kubernetes/client.py b/kraken/kubernetes/client.py index 80cea369..2366606e 100644 --- a/kraken/kubernetes/client.py +++ b/kraken/kubernetes/client.py @@ -579,7 +579,7 @@ def watch_node_status(node, status, timeout, resource_version): cli.list_node, field_selector=f"metadata.name={node}", timeout_seconds=timeout, - resource_version=f"{resource_version}", + resource_version=f"{resource_version}" ): conditions = [status for status in event["object"].status.conditions if status.type == "Ready"] if conditions[0].status == status: diff --git a/kraken/plugins/__init__.py b/kraken/plugins/__init__.py index 2c07a4de..0b0498b2 100644 --- a/kraken/plugins/__init__.py +++ b/kraken/plugins/__init__.py @@ -5,7 +5,7 @@ from os.path import abspath from typing import List, Dict from arcaflow_plugin_sdk import schema, serialization, jsonschema - +import kraken.plugins.vmware.vmware_plugin as vmware_plugin from kraken.plugins.pod_plugin import kill_pods, wait_for_pods from kraken.plugins.run_python_plugin import run_python_file @@ -153,6 +153,30 @@ PLUGINS = Plugins( [ "error" ] + ), + PluginStep( + vmware_plugin.node_start, + [ + "error" + ] + ), + PluginStep( + vmware_plugin.node_stop, + [ + "error" + ] + ), + PluginStep( + vmware_plugin.node_reboot, + [ + "error" + ] + ), + PluginStep( + vmware_plugin.node_terminate, + [ + "error" + ] ) ] ) diff --git a/kraken/plugins/vmware/kubernetes_functions.py b/kraken/plugins/vmware/kubernetes_functions.py new file mode 100644 index 00000000..22038cac --- /dev/null +++ b/kraken/plugins/vmware/kubernetes_functions.py @@ -0,0 +1,179 @@ +from kubernetes import config, client +from kubernetes.client.rest import ApiException +import logging +import random +from enum import Enum + + +class Actions(Enum): + """ + This enumeration indicates different kinds of node operations + """ + + START = "Start" + STOP = "Stop" + TERMINATE = "Terminate" + REBOOT = "Reboot" + + +def setup_kubernetes(kubeconfig_path): + """ + Sets up the Kubernetes client + """ + + if kubeconfig_path is None: + kubeconfig_path = config.KUBE_CONFIG_DEFAULT_LOCATION + kubeconfig = config.kube_config.KubeConfigMerger(kubeconfig_path) + + if kubeconfig.config is None: + raise Exception( + "Invalid kube-config file: %s. " "No configuration found." % kubeconfig_path + ) + loader = config.kube_config.KubeConfigLoader( + config_dict=kubeconfig.config, + ) + client_config = client.Configuration() + loader.load_and_set(client_config) + return client.ApiClient(configuration=client_config) + + +def list_killable_nodes(core_v1, label_selector=None): + """ + Returns a list of nodes that can be stopped/reset/released + """ + + nodes = [] + try: + if label_selector: + ret = core_v1.list_node(pretty=True, label_selector=label_selector) + else: + ret = core_v1.list_node(pretty=True) + except ApiException as e: + logging.error("Exception when calling CoreV1Api->list_node: %s\n" % e) + raise e + for node in ret.items: + for cond in node.status.conditions: + if str(cond.type) == "Ready" and str(cond.status) == "True": + nodes.append(node.metadata.name) + return nodes + + +def list_startable_nodes(core_v1, label_selector=None): + """ + Returns a list of nodes that can be started + """ + + nodes = [] + try: + if label_selector: + ret = core_v1.list_node(pretty=True, label_selector=label_selector) + else: + ret = core_v1.list_node(pretty=True) + except ApiException as e: + logging.error("Exception when calling CoreV1Api->list_node: %s\n" % e) + raise e + for node in ret.items: + for cond in node.status.conditions: + if str(cond.type) == "Ready" and str(cond.status) != "True": + nodes.append(node.metadata.name) + return nodes + + +def get_node_list(cfg, action, core_v1): + """ + Returns a list of nodes to be used in the node scenarios. The list returned is constructed as follows: + - If the key 'name' is present in the node scenario config, the value is extracted and split into + a list + - Each node in the list is fed to the get_node function which checks if the node is killable or + fetches the node using the label selector + """ + + def get_node(node_name, label_selector, instance_kill_count, action, core_v1): + list_nodes_func = ( + list_startable_nodes if action == Actions.START else list_killable_nodes + ) + if node_name in list_nodes_func(core_v1): + return [node_name] + elif node_name: + logging.info( + "Node with provided node_name does not exist or the node might " + "be in NotReady state." + ) + nodes = list_nodes_func(core_v1, label_selector) + if not nodes: + raise Exception("Ready nodes with the provided label selector do not exist") + logging.info( + "Ready nodes with the label selector %s: %s" % (label_selector, nodes) + ) + number_of_nodes = len(nodes) + if instance_kill_count == number_of_nodes: + return nodes + nodes_to_return = [] + for i in range(instance_kill_count): + node_to_add = nodes[random.randint(0, len(nodes) - 1)] + nodes_to_return.append(node_to_add) + nodes.remove(node_to_add) + return nodes_to_return + + if cfg.name: + input_nodes = cfg.name.split(",") + else: + input_nodes = [""] + scenario_nodes = set() + + if cfg.skip_openshift_checks: + scenario_nodes = input_nodes + else: + for node in input_nodes: + nodes = get_node( + node, cfg.label_selector, cfg.instance_count, action, core_v1 + ) + scenario_nodes.update(nodes) + + return list(scenario_nodes) + + +def watch_node_status(node, status, timeout, watch_resource, core_v1): + """ + Monitor the status of a node for change + """ + count = timeout + for event in watch_resource.stream( + core_v1.list_node, + field_selector=f"metadata.name={node}", + timeout_seconds=timeout, + ): + conditions = [ + status + for status in event["object"].status.conditions + if status.type == "Ready" + ] + if conditions[0].status == status: + watch_resource.stop() + break + else: + count -= 1 + logging.info("Status of node " + node + ": " + str(conditions[0].status)) + if not count: + watch_resource.stop() + + +def wait_for_ready_status(node, timeout, watch_resource, core_v1): + """ + Wait until the node status becomes Ready + """ + watch_node_status(node, "True", timeout, watch_resource, core_v1) + + +def wait_for_not_ready_status(node, timeout, watch_resource, core_v1): + """ + Wait until the node status becomes Not Ready + """ + watch_node_status(node, "False", timeout, watch_resource, core_v1) + + +def wait_for_unknown_status(node, timeout, watch_resource, core_v1): + """ + Wait until the node status becomes Unknown + """ + watch_node_status(node, "Unknown", timeout, watch_resource, core_v1) diff --git a/kraken/plugins/vmware/test_vmware_plugin.py b/kraken/plugins/vmware/test_vmware_plugin.py new file mode 100644 index 00000000..b8fea27b --- /dev/null +++ b/kraken/plugins/vmware/test_vmware_plugin.py @@ -0,0 +1,119 @@ +import unittest +import os +import os +import logging +from arcaflow_plugin_sdk import plugin +from kraken.plugins.vmware.kubernetes_functions import Actions +from kraken.plugins.vmware import vmware_plugin + + +class NodeScenariosTest(unittest.TestCase): + def setUp(self): + vsphere_env_vars = ["VSPHERE_IP", "VSPHERE_USERNAME", "VSPHERE_PASSWORD"] + self.credentials_present = all( + env_var in os.environ for env_var in vsphere_env_vars + ) + + def test_serialization(self): + plugin.test_object_serialization( + vmware_plugin.NodeScenarioConfig(name="test", skip_openshift_checks=True), + self.fail, + ) + plugin.test_object_serialization( + vmware_plugin.NodeScenarioSuccessOutput( + nodes={}, action=Actions.START + ), + self.fail, + ) + plugin.test_object_serialization( + vmware_plugin.NodeScenarioErrorOutput( + error="Hello World", action=Actions.START + ), + self.fail, + ) + + def test_node_start(self): + if not self.credentials_present: + self.skipTest( + "Check if the environmental variables 'VSPHERE_IP', 'VSPHERE_USERNAME', 'VSPHERE_PASSWORD' are set" + ) + vsphere = vmware_plugin.vSphere(verify=False) + vm_id, vm_name = vsphere.create_default_vm() + if vm_id is None: + self.fail("Could not create test VM") + + output_id, output_data = vmware_plugin.node_start( + vmware_plugin.NodeScenarioConfig( + name=vm_name, skip_openshift_checks=True, verify_session=False + ) + ) + if output_id == "error": + logging.error(output_data.error) + self.fail("The VMware VM did not start because an error occurred") + vsphere.release_instances(vm_name) + + def test_node_stop(self): + if not self.credentials_present: + self.skipTest( + "Check if the environmental variables 'VSPHERE_IP', 'VSPHERE_USERNAME', 'VSPHERE_PASSWORD' are set" + ) + vsphere = vmware_plugin.vSphere(verify=False) + vm_id, vm_name = vsphere.create_default_vm() + if vm_id is None: + self.fail("Could not create test VM") + vsphere.start_instances(vm_name) + + output_id, output_data = vmware_plugin.node_stop( + vmware_plugin.NodeScenarioConfig( + name=vm_name, skip_openshift_checks=True, verify_session=False + ) + ) + if output_id == "error": + logging.error(output_data.error) + self.fail("The VMware VM did not stop because an error occurred") + vsphere.release_instances(vm_name) + + def test_node_reboot(self): + if not self.credentials_present: + self.skipTest( + "Check if the environmental variables 'VSPHERE_IP', 'VSPHERE_USERNAME', 'VSPHERE_PASSWORD' are set" + ) + vsphere = vmware_plugin.vSphere(verify=False) + vm_id, vm_name = vsphere.create_default_vm() + if vm_id is None: + self.fail("Could not create test VM") + vsphere.start_instances(vm_name) + + output_id, output_data = vmware_plugin.node_reboot( + vmware_plugin.NodeScenarioConfig( + name=vm_name, skip_openshift_checks=True, verify_session=False + ) + ) + if output_id == "error": + logging.error(output_data.error) + self.fail("The VMware VM did not reboot because an error occurred") + vsphere.release_instances(vm_name) + + def test_node_terminate(self): + if not self.credentials_present: + self.skipTest( + "Check if the environmental variables 'VSPHERE_IP', 'VSPHERE_USERNAME', 'VSPHERE_PASSWORD' are set" + ) + vsphere = vmware_plugin.vSphere(verify=False) + vm_id, vm_name = vsphere.create_default_vm() + if vm_id is None: + self.fail("Could not create test VM") + vsphere.start_instances(vm_name) + + output_id, output_data = vmware_plugin.node_terminate( + vmware_plugin.NodeScenarioConfig( + name=vm_name, skip_openshift_checks=True, verify_session=False + ) + ) + if output_id == "error": + logging.error(output_data.error) + self.fail("The VMware VM did not reboot because an error occurred") + + +if __name__ == "__main__": + unittest.main() diff --git a/kraken/plugins/vmware/vmware_plugin.py b/kraken/plugins/vmware/vmware_plugin.py new file mode 100644 index 00000000..4b13d94f --- /dev/null +++ b/kraken/plugins/vmware/vmware_plugin.py @@ -0,0 +1,651 @@ +#!/usr/bin/env python +import sys +import time +import typing +from os import environ +from dataclasses import dataclass, field +import random +from traceback import format_exc +import logging +from kraken.plugins.vmware import kubernetes_functions as kube_helper +from com.vmware.vcenter_client import ResourcePool +from arcaflow_plugin_sdk import validation, plugin +from kubernetes import client, watch +from vmware.vapi.vsphere.client import create_vsphere_client +from com.vmware.vcenter_client import VM +from com.vmware.vcenter.vm_client import Power +from com.vmware.vapi.std.errors_client import ( + AlreadyInDesiredState, + NotAllowedInCurrentState, +) +import requests +import sys + + +class vSphere: + def __init__(self, verify=True): + """ + Initialize the vSphere client by using the the env variables: + 'VSPHERE_IP', 'VSPHERE_USERNAME', 'VSPHERE_PASSWORD' + """ + self.server = environ.get("VSPHERE_IP") + self.username = environ.get("VSPHERE_USERNAME") + self.password = environ.get("VSPHERE_PASSWORD") + session = self.get_unverified_session() if not verify else None + self.credentials_present = ( + True if self.server and self.username and self.password else False + ) + if not self.credentials_present: + raise Exception( + "Environmental variables 'VSPHERE_IP', 'VSPHERE_USERNAME', 'VSPHERE_PASSWORD' are not set" + ) + self.client = create_vsphere_client( + server=self.server, + username=self.username, + password=self.password, + session=session, + ) + + def get_unverified_session(self): + """ + Returns an unverified session object + """ + + session = requests.session() + session.verify = False + requests.packages.urllib3.disable_warnings() + return session + + def get_vm(self, instance_id): + """ + Returns the VM ID corresponding to the VM Name (instance_id) + If there are multiple matches, this only returns the first one + """ + + names = set([instance_id]) + vms = self.client.vcenter.VM.list(VM.FilterSpec(names=names)) + + if len(vms) == 0: + logging.info("VM with name ({}) not found".format(instance_id)) + return None + vm = vms[0].vm + + return vm + + def release_instances(self, instance_id): + """ + Deletes the VM whose name is given by 'instance_id' + """ + + vm = self.get_vm(instance_id) + if not vm: + raise Exception( + "VM with the name ({}) does not exist." + "Please create the vm first.".format(instance_id) + ) + state = self.client.vcenter.vm.Power.get(vm) + if state == Power.Info(state=Power.State.POWERED_ON): + self.client.vcenter.vm.Power.stop(vm) + elif state == Power.Info(state=Power.State.SUSPENDED): + self.client.vcenter.vm.Power.start(vm) + self.client.vcenter.vm.Power.stop(vm) + self.client.vcenter.VM.delete(vm) + logging.info("Deleted VM -- '{}-({})'".format(instance_id, vm)) + + def reboot_instances(self, instance_id): + """ + Reboots the VM whose name is given by 'instance_id'. Returns True if successful, or + returns False if the VM is not powered on + """ + + vm = self.get_vm(instance_id) + try: + self.client.vcenter.vm.Power.reset(vm) + logging.info("Reset VM -- '{}-({})'".format(instance_id, vm)) + return True + except NotAllowedInCurrentState: + logging.info( + "VM '{}'-'({})' is not Powered On. Cannot reset it".format( + instance_id, vm + ) + ) + return False + + def stop_instances(self, instance_id): + """ + Stops the VM whose name is given by 'instance_id'. Returns True if successful, or + returns False if the VM is already powered off + """ + + vm = self.get_vm(instance_id) + try: + self.client.vcenter.vm.Power.stop(vm) + logging.info("Stopped VM -- '{}-({})'".format(instance_id, vm)) + return True + except AlreadyInDesiredState: + logging.info( + "VM '{}'-'({})' is already Powered Off".format(instance_id, vm) + ) + return False + + def start_instances(self, instance_id): + """ + Stops the VM whose name is given by 'instance_id'. Returns True if successful, or + returns False if the VM is already powered on + """ + + vm = self.get_vm(instance_id) + try: + self.client.vcenter.vm.Power.start(vm) + logging.info("Started VM -- '{}-({})'".format(instance_id, vm)) + return True + except AlreadyInDesiredState: + logging.info("VM '{}'-'({})' is already Powered On".format(instance_id, vm)) + return False + + def list_instances(self, datacenter): + """ + Returns a list of VMs present in the datacenter + """ + + datacenter_filter = self.client.vcenter.Datacenter.FilterSpec( + names=set([datacenter]) + ) + datacenter_summaries = self.client.vcenter.Datacenter.list(datacenter_filter) + try: + datacenter_id = datacenter_summaries[0].datacenter + except IndexError: + logging.error("Datacenter '{}' doesn't exist".format(datacenter)) + sys.exit(1) + + vm_filter = self.client.vcenter.VM.FilterSpec(datacenters={datacenter_id}) + vm_summaries = self.client.vcenter.VM.list(vm_filter) + vm_names = [] + for vm in vm_summaries: + vm_names.append({"vm_name": vm.name, "vm_id": vm.vm}) + return vm_names + + def get_datacenter_list(self): + """ + Returns a dictionary containing all the datacenter names and IDs + """ + + datacenter_summaries = self.client.vcenter.Datacenter.list() + datacenter_names = [ + {"datacenter_id": datacenter.datacenter, "datacenter_name": datacenter.name} + for datacenter in datacenter_summaries + ] + return datacenter_names + + def get_datastore_list(self, datacenter=None): + """ + Returns a dictionary containing all the datastore names and IDs belonging to a specific datacenter + """ + + datastore_filter = self.client.vcenter.Datastore.FilterSpec( + datacenters={datacenter} + ) + datastore_summaries = self.client.vcenter.Datastore.list(datastore_filter) + datastore_names = [] + for datastore in datastore_summaries: + datastore_names.append( + {"datastore_name": datastore.name, "datastore_id": datastore.datastore} + ) + return datastore_names + + def get_folder_list(self, datacenter=None): + """ + Returns a dictionary containing all the folder names and IDs belonging to a specific datacenter + """ + + folder_filter = self.client.vcenter.Folder.FilterSpec(datacenters={datacenter}) + folder_summaries = self.client.vcenter.Folder.list(folder_filter) + folder_names = [] + for folder in folder_summaries: + folder_names.append( + {"folder_name": folder.name, "folder_id": folder.folder} + ) + return folder_names + + def get_resource_pool(self, datacenter, resource_pool_name=None): + """ + Returns the identifier of the resource pool with the given name or the + first resource pool in the datacenter if the name is not provided. + """ + + names = set([resource_pool_name]) if resource_pool_name else None + filter_spec = ResourcePool.FilterSpec( + datacenters=set([datacenter]), names=names + ) + resource_pool_summaries = self.client.vcenter.ResourcePool.list(filter_spec) + if len(resource_pool_summaries) > 0: + resource_pool = resource_pool_summaries[0].resource_pool + return resource_pool + else: + logging.error( + "ResourcePool not found in Datacenter '{}'".format(datacenter) + ) + return None + + def create_default_vm(self, guest_os="RHEL_7_64", max_attempts=10): + """ + Creates a default VM with 2 GB memory, 1 CPU and 16 GB disk space in a + random datacenter. Accepts the guest OS as a parameter. Since the VM placement + is random, it might fail due to resource constraints. So, this function tries + for upto 'max_attempts' to create the VM + """ + + def create_vm(vm_name, resource_pool, folder, datastore, guest_os): + """ + Creates a VM and returns its ID and name. Requires the VM name, resource + pool name, folder name, datastore and the guest OS + """ + + placement_spec = VM.PlacementSpec( + folder=folder, resource_pool=resource_pool, datastore=datastore + ) + vm_create_spec = VM.CreateSpec( + name=vm_name, guest_os=guest_os, placement=placement_spec + ) + + vm_id = self.client.vcenter.VM.create(vm_create_spec) + return vm_id + + for _ in range(max_attempts): + try: + datacenter_list = self.get_datacenter_list() + datacenter = random.choice(datacenter_list) + resource_pool = self.get_resource_pool(datacenter["datacenter_id"]) + folder = random.choice( + self.get_folder_list(datacenter["datacenter_id"]) + )["folder_id"] + datastore = random.choice( + self.get_datastore_list(datacenter["datacenter_id"]) + )["datastore_id"] + vm_name = "Test-" + str(time.time_ns()) + return ( + create_vm(vm_name, resource_pool, folder, datastore, guest_os), + vm_name, + ) + except Exception as e: + pass + logging.error( + "Default VM could not be created in {} attempts. Check your VMware resources".format( + max_attempts + ) + ) + return None, None + + def get_vm_status(self, instance_id): + """ + Returns the status of the VM whose name is given by 'instance_id' + """ + + try: + vm = self.get_vm(instance_id) + state = self.client.vcenter.vm.Power.get(vm).state + logging.info("Check instance %s status", instance_id) + return state + except Exception as e: + logging.error( + "Failed to get node instance status %s. Encountered following " + "exception: %s." % (instance_id, e) + ) + return None + + def wait_until_released(self, instance_id, timeout): + """ + Waits until the VM is deleted or until the timeout. Returns True if + the VM is successfully deleted, else returns False + """ + + time_counter = 0 + vm = self.get_vm(instance_id) + while vm is not None: + vm = self.get_vm(instance_id) + logging.info( + "VM %s is still being deleted, sleeping for 5 seconds" % instance_id + ) + time.sleep(5) + time_counter += 5 + if time_counter >= timeout: + logging.info( + "VM %s is still not deleted in allotted time" % instance_id + ) + return False + return True + + def wait_until_running(self, instance_id, timeout): + """ + Waits until the VM switches to POWERED_ON state or until the timeout. + Returns True if the VM switches to POWERED_ON, else returns False + """ + + time_counter = 0 + status = self.get_vm_status(instance_id) + while status != Power.State.POWERED_ON: + status = self.get_vm_status(instance_id) + logging.info( + "VM %s is still not running, sleeping for 5 seconds" % instance_id + ) + time.sleep(5) + time_counter += 5 + if time_counter >= timeout: + logging.info("VM %s is still not ready in allotted time" % instance_id) + return False + return True + + def wait_until_stopped(self, instance_id, timeout): + """ + Waits until the VM switches to POWERED_OFF state or until the timeout. + Returns True if the VM switches to POWERED_OFF, else returns False + """ + + time_counter = 0 + status = self.get_vm_status(instance_id) + while status != Power.State.POWERED_OFF: + status = self.get_vm_status(instance_id) + logging.info( + "VM %s is still not running, sleeping for 5 seconds" % instance_id + ) + time.sleep(5) + time_counter += 5 + if time_counter >= timeout: + logging.info("VM %s is still not ready in allotted time" % instance_id) + return False + return True + + +@dataclass +class Node: + name: str + + +@dataclass +class NodeScenarioSuccessOutput: + + nodes: typing.Dict[int, Node] = field( + metadata={ + "name": "Nodes started/stopped/terminated/rebooted", + "description": """Map between timestamps and the pods started/stopped/terminated/rebooted. + The timestamp is provided in nanoseconds""", + } + ) + action: kube_helper.Actions = field( + metadata={ + "name": "The action performed on the node", + "description": """The action performed or attempted to be performed on the node. Possible values + are : Start, Stop, Terminate, Reboot""", + } + ) + + +@dataclass +class NodeScenarioErrorOutput: + + error: str + action: kube_helper.Actions = field( + metadata={ + "name": "The action performed on the node", + "description": """The action attempted to be performed on the node. Possible values are : Start + Stop, Terminate, Reboot""", + } + ) + + +@dataclass +class NodeScenarioConfig: + + name: typing.Annotated[ + typing.Optional[str], + validation.required_if_not("label_selector"), + validation.required_if("skip_openshift_checks"), + ] = field( + default=None, + metadata={ + "name": "Name", + "description": "Name(s) for target nodes. Required if label_selector is not set.", + }, + ) + + runs: typing.Annotated[typing.Optional[int], validation.min(1)] = field( + default=1, + metadata={ + "name": "Number of runs per node", + "description": "Number of times to inject each scenario under actions (will perform on same node each time)", + }, + ) + + label_selector: typing.Annotated[ + typing.Optional[str], validation.min(1), validation.required_if_not("name") + ] = field( + default=None, + metadata={ + "name": "Label selector", + "description": "Kubernetes label selector for the target nodes. Required if name is not set.\n" + "See https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ for details.", + }, + ) + + timeout: typing.Annotated[typing.Optional[int], validation.min(1)] = field( + default=180, + metadata={ + "name": "Timeout", + "description": "Timeout to wait for the target pod(s) to be removed in seconds.", + }, + ) + + instance_count: typing.Annotated[typing.Optional[int], validation.min(1)] = field( + default=1, + metadata={ + "name": "Instance Count", + "description": "Number of nodes to perform action/select that match the label selector.", + }, + ) + + skip_openshift_checks: typing.Optional[bool] = field( + default=False, + metadata={ + "name": "Skip Openshift Checks", + "description": "Skip checking the status of the openshift nodes.", + }, + ) + + verify_session: bool = field( + default=True, + metadata={ + "name": "Verify API Session", + "description": "Verifies the vSphere client session. It is enabled by default", + }, + ) + + kubeconfig_path: typing.Optional[str] = field( + default=None, + metadata={ + "name": "Kubeconfig path", + "description": "Path to your Kubeconfig file. Defaults to ~/.kube/config.\n" + "See https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/ for " + "details.", + }, + ) + + +@plugin.step( + id="node-start", + name="Start the node", + description="Start the node(s) by starting the VMware VM on which the node is configured", + outputs={"success": NodeScenarioSuccessOutput, "error": NodeScenarioErrorOutput}, +) +def node_start( + cfg: NodeScenarioConfig, +) -> typing.Tuple[ + str, typing.Union[NodeScenarioSuccessOutput, NodeScenarioErrorOutput] +]: + with kube_helper.setup_kubernetes(None) as cli: + vsphere = vSphere(verify=cfg.verify_session) + core_v1 = client.CoreV1Api(cli) + watch_resource = watch.Watch() + node_list = kube_helper.get_node_list(cfg, kube_helper.Actions.START, core_v1) + nodes_started = {} + for name in node_list: + try: + for _ in range(cfg.runs): + logging.info("Starting node_start_scenario injection") + logging.info("Starting the node %s " % (name)) + vm_started = vsphere.start_instances(name) + if vm_started: + vsphere.wait_until_running(name, cfg.timeout) + if not cfg.skip_openshift_checks: + kube_helper.wait_for_ready_status( + name, cfg.timeout, watch_resource, core_v1 + ) + nodes_started[int(time.time_ns())] = Node(name=name) + logging.info("Node with instance ID: %s is in running state" % name) + logging.info("node_start_scenario has been successfully injected!") + except Exception as e: + logging.error("Failed to start node instance. Test Failed") + logging.error("node_start_scenario injection failed!") + return "error", NodeScenarioErrorOutput( + format_exc(), kube_helper.Actions.START + ) + + return "success", NodeScenarioSuccessOutput( + nodes_started, kube_helper.Actions.START + ) + + +@plugin.step( + id="node-stop", + name="Stop the node", + description="Stop the node(s) by starting the VMware VM on which the node is configured", + outputs={"success": NodeScenarioSuccessOutput, "error": NodeScenarioErrorOutput}, +) +def node_stop( + cfg: NodeScenarioConfig, +) -> typing.Tuple[ + str, typing.Union[NodeScenarioSuccessOutput, NodeScenarioErrorOutput] +]: + with kube_helper.setup_kubernetes(None) as cli: + vsphere = vSphere(verify=cfg.verify_session) + core_v1 = client.CoreV1Api(cli) + watch_resource = watch.Watch() + node_list = kube_helper.get_node_list(cfg, kube_helper.Actions.STOP, core_v1) + nodes_stopped = {} + for name in node_list: + try: + for _ in range(cfg.runs): + logging.info("Starting node_stop_scenario injection") + logging.info("Stopping the node %s " % (name)) + vm_stopped = vsphere.stop_instances(name) + if vm_stopped: + vsphere.wait_until_stopped(name, cfg.timeout) + if not cfg.skip_openshift_checks: + kube_helper.wait_for_ready_status( + name, cfg.timeout, watch_resource, core_v1 + ) + nodes_stopped[int(time.time_ns())] = Node(name=name) + logging.info("Node with instance ID: %s is in stopped state" % name) + logging.info("node_stop_scenario has been successfully injected!") + except Exception as e: + logging.error("Failed to stop node instance. Test Failed") + logging.error("node_stop_scenario injection failed!") + return "error", NodeScenarioErrorOutput( + format_exc(), kube_helper.Actions.STOP + ) + + return "success", NodeScenarioSuccessOutput( + nodes_stopped, kube_helper.Actions.STOP + ) + + +@plugin.step( + id="node-reboot", + name="Reboot VMware VM", + description="Reboot the node(s) by starting the VMware VM on which the node is configured", + outputs={"success": NodeScenarioSuccessOutput, "error": NodeScenarioErrorOutput}, +) +def node_reboot( + cfg: NodeScenarioConfig, +) -> typing.Tuple[ + str, typing.Union[NodeScenarioSuccessOutput, NodeScenarioErrorOutput] +]: + with kube_helper.setup_kubernetes(None) as cli: + vsphere = vSphere(verify=cfg.verify_session) + core_v1 = client.CoreV1Api(cli) + watch_resource = watch.Watch() + node_list = kube_helper.get_node_list(cfg, kube_helper.Actions.REBOOT, core_v1) + nodes_rebooted = {} + for name in node_list: + try: + for _ in range(cfg.runs): + logging.info("Starting node_reboot_scenario injection") + logging.info("Rebooting the node %s " % (name)) + vsphere.reboot_instances(name) + if not cfg.skip_openshift_checks: + kube_helper.wait_for_unknown_status( + name, cfg.timeout, watch_resource, core_v1 + ) + kube_helper.wait_for_ready_status( + name, cfg.timeout, watch_resource, core_v1 + ) + nodes_rebooted[int(time.time_ns())] = Node(name=name) + logging.info( + "Node with instance ID: %s has rebooted successfully" % name + ) + logging.info("node_reboot_scenario has been successfully injected!") + except Exception as e: + logging.error("Failed to reboot node instance. Test Failed") + logging.error("node_reboot_scenario injection failed!") + return "error", NodeScenarioErrorOutput( + format_exc(), kube_helper.Actions.REBOOT + ) + + return "success", NodeScenarioSuccessOutput( + nodes_rebooted, kube_helper.Actions.REBOOT + ) + + +@plugin.step( + id="node-terminate", + name="Reboot VMware VM", + description="Wait for the specified number of pods to be present", + outputs={"success": NodeScenarioSuccessOutput, "error": NodeScenarioErrorOutput}, +) +def node_terminate( + cfg: NodeScenarioConfig, +) -> typing.Tuple[ + str, typing.Union[NodeScenarioSuccessOutput, NodeScenarioErrorOutput] +]: + with kube_helper.setup_kubernetes(None) as cli: + vsphere = vSphere(verify=cfg.verify_session) + core_v1 = client.CoreV1Api(cli) + node_list = kube_helper.get_node_list( + cfg, kube_helper.Actions.TERMINATE, core_v1 + ) + nodes_terminated = {} + for name in node_list: + try: + for _ in range(cfg.runs): + logging.info( + "Starting node_termination_scenario injection by first stopping the node" + ) + vsphere.stop_instances(name) + vsphere.wait_until_stopped(name, cfg.timeout) + logging.info("Releasing the node with instance ID: %s " % (name)) + vsphere.release_instances(name) + vsphere.wait_until_released(name, cfg.timeout) + nodes_terminated[int(time.time_ns())] = Node(name=name) + logging.info("Node with instance ID: %s has been released" % name) + logging.info( + "node_terminate_scenario has been successfully injected!" + ) + except Exception as e: + logging.error("Failed to terminate node instance. Test Failed") + logging.error("node_terminate_scenario injection failed!") + return "error", NodeScenarioErrorOutput( + format_exc(), kube_helper.Actions.TERMINATE + ) + + return "success", NodeScenarioSuccessOutput( + nodes_terminated, kube_helper.Actions.TERMINATE + ) diff --git a/requirements.txt b/requirements.txt index 0f28cbd4..e8165ddb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,7 @@ oauth2client>=4.1.3 python-openstackclient gitpython paramiko -setuptools +setuptools==63.4.1 openshift-client python-ipmi podman-compose @@ -23,3 +23,4 @@ werkzeug==2.0.3 aliyun-python-sdk-core-v3 aliyun-python-sdk-ecs arcaflow-plugin-sdk==0.3.0 +git+https://github.com/vmware/vsphere-automation-sdk-python.git \ No newline at end of file diff --git a/scenarios/openshift/vmware_node_scenarios.yml b/scenarios/openshift/vmware_node_scenarios.yml new file mode 100644 index 00000000..9502d0e0 --- /dev/null +++ b/scenarios/openshift/vmware_node_scenarios.yml @@ -0,0 +1,10 @@ +# yaml-language-server: $schema=../plugin.schema.json +- id: + config: + name: # Node on which scenario has to be injected; can set multiple names separated by comma + label_selector: # When node_name is not specified, a node with matching label_selector is selected for node chaos scenario injection + runs: 1 # Number of times to inject each scenario under actions (will perform on same node each time) + instance_count: 1 # Number of nodes to perform action/select that match the label selector + timeout: 300 # Duration to wait for completion of node scenario injection + verify_session: True # Set to True if you want to verify the vSphere client session using certificates; else False + skip_openshift_checks: False # Set to True if you don't want to wait for the status of the nodes to change on OpenShift before passing the scenario \ No newline at end of file