adding vsphere updates to non native
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 10m19s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped

Signed-off-by: Paige Patton <prubenda@redhat.com>
This commit is contained in:
Paige Patton
2024-12-12 16:11:33 -05:00
committed by Naga Ravi Chaitanya Elluri
parent b024cfde19
commit 21ab8d475d
8 changed files with 158 additions and 476 deletions

View File

@@ -93,12 +93,7 @@ How to set up Alibaba cli to run node scenarios is defined [here](cloud_setup.md
#### VMware
How to set up VMware vSphere to run node scenarios is defined [here](cloud_setup.md#vmware)
This cloud type uses a different configuration style, see actions below and [example config file](../scenarios/openshift/vmware_node_scenarios.yml)
- vmware-node-terminate
- vmware-node-reboot
- vmware-node-stop
- vmware-node-start
See [example config file](../scenarios/openshift/vmware_node_scenarios.yml)

View File

@@ -49,7 +49,6 @@ class NativeScenarioPlugin(AbstractScenarioPlugin):
return [
"pod_disruption_scenarios",
"pod_network_scenarios",
"vmware_node_scenarios",
"ibmcloud_node_scenarios",
]

View File

@@ -18,9 +18,6 @@ from krkn.scenario_plugins.native.pod_network_outage.pod_network_outage_plugin i
)
from arcaflow_plugin_sdk import schema, serialization, jsonschema
from krkn.scenario_plugins.native.node_scenarios import vmware_plugin
@dataclasses.dataclass
class PluginStep:
schema: schema.StepSchema
@@ -160,10 +157,6 @@ PLUGINS = Plugins(
),
PluginStep(wait_for_pods, ["error"]),
PluginStep(run_python_file, ["error"]),
PluginStep(vmware_plugin.node_start, ["error"]),
PluginStep(vmware_plugin.node_stop, ["error"]),
PluginStep(vmware_plugin.node_reboot, ["error"]),
PluginStep(vmware_plugin.node_terminate, ["error"]),
PluginStep(ibmcloud_plugin.node_start, ["error"]),
PluginStep(ibmcloud_plugin.node_stop, ["error"]),
PluginStep(ibmcloud_plugin.node_reboot, ["error"]),

View File

@@ -199,7 +199,7 @@ class Alibaba:
return False
end_time = time.time()
if affected_node:
affected_node.set_affected_node_status("running", end_time - start_time)
affected_node.set_affected_node_status("stopped", end_time - start_time)
return True
# Wait until the node instance is terminated

View File

@@ -22,6 +22,7 @@ from krkn.scenario_plugins.node_actions.gcp_node_scenarios import gcp_node_scena
from krkn.scenario_plugins.node_actions.general_cloud_node_scenarios import (
general_node_scenarios,
)
from krkn.scenario_plugins.node_actions.vmware_node_scenarios import vmware_node_scenarios
node_general = False
@@ -81,12 +82,12 @@ class NodeActionsScenarioPlugin(AbstractScenarioPlugin):
return openstack_node_scenarios(kubecli, affected_nodes_status)
elif (
node_scenario["cloud_type"].lower() == "azure"
or node_scenario["cloud_type"] == "az"
or node_scenario["cloud_type"].lower() == "az"
):
return azure_node_scenarios(kubecli, affected_nodes_status)
elif (
node_scenario["cloud_type"].lower() == "alibaba"
or node_scenario["cloud_type"] == "alicloud"
or node_scenario["cloud_type"].lower() == "alicloud"
):
from krkn.scenario_plugins.node_actions.alibaba_node_scenarios import (
alibaba_node_scenarios,
@@ -106,7 +107,12 @@ class NodeActionsScenarioPlugin(AbstractScenarioPlugin):
affected_nodes_status
)
elif node_scenario["cloud_type"].lower() == "docker":
return docker_node_scenarios(kubecli, affected_nodes_status)
return docker_node_scenarios(kubecli)
elif (
node_scenario["cloud_type"].lower() == "vsphere"
or node_scenario["cloud_type"].lower() == "vmware"
):
return vmware_node_scenarios(kubecli, affected_nodes_status)
else:
logging.error(
"Cloud type "

View File

@@ -3,25 +3,25 @@ import logging
import random
import sys
import time
import typing
from dataclasses import dataclass, field
import urllib3
from krkn_lib.k8s import KrknKubernetes
import krkn.scenario_plugins.node_actions.common_node_functions as nodeaction
from krkn.scenario_plugins.node_actions.abstract_node_scenarios import (
abstract_node_scenarios,
)
from dataclasses import dataclass
from os import environ
from traceback import format_exc
import requests
from arcaflow_plugin_sdk import plugin, validation
from com.vmware.vapi.std.errors_client import (
AlreadyInDesiredState,
NotAllowedInCurrentState,
)
from com.vmware.vcenter.vm_client import Power
from com.vmware.vcenter_client import VM, ResourcePool
from kubernetes import client, watch
from vmware.vapi.vsphere.client import create_vsphere_client
from krkn.scenario_plugins.native.node_scenarios import (
kubernetes_functions as kube_helper,
)
from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus
class vSphere:
def __init__(self, verify=True):
@@ -32,7 +32,7 @@ class vSphere:
self.server = environ.get("VSPHERE_IP")
self.username = environ.get("VSPHERE_USERNAME")
self.password = environ.get("VSPHERE_PASSWORD")
session = self.get_unverified_session() if not verify else None
session = self.get_unverified_session()
self.credentials_present = (
True if self.server and self.username and self.password else False
)
@@ -42,6 +42,7 @@ class vSphere:
"'VSPHERE_IP', 'VSPHERE_USERNAME', "
"'VSPHERE_PASSWORD' are not set"
)
self.client = create_vsphere_client(
server=self.server,
username=self.username,
@@ -53,10 +54,13 @@ class vSphere:
"""
Returns an unverified session object
"""
session = requests.session()
# Set the proxy settings for the session
session.verify = False
requests.packages.urllib3.disable_warnings()
urllib3.disable_warnings()
return session
def get_vm(self, instance_id):
@@ -297,14 +301,16 @@ class vSphere:
)
return None
def wait_until_released(self, instance_id, timeout):
def wait_until_released(self, instance_id, timeout, affected_node):
"""
Waits until the VM is deleted or until the timeout. Returns True if
the VM is successfully deleted, else returns False
"""
time_counter = 0
start_time = time.time()
vm = self.get_vm(instance_id)
exit_status = True
while vm is not None:
vm = self.get_vm(instance_id)
logging.info(
@@ -314,16 +320,22 @@ class vSphere:
time_counter += 5
if time_counter >= timeout:
logging.info(f"VM {instance_id} is still not deleted in allotted time")
return False
return True
exit_status = False
end_time = time.time()
if affected_node:
affected_node.set_affected_node_status("terminated", end_time - start_time)
return exit_status
def wait_until_running(self, instance_id, timeout):
def wait_until_running(self, instance_id, timeout, affected_node):
"""
Waits until the VM switches to POWERED_ON state or until the timeout.
Returns True if the VM switches to POWERED_ON, else returns False
"""
time_counter = 0
start_time = time.time()
exit_status = True
status = self.get_vm_status(instance_id)
while status != Power.State.POWERED_ON:
status = self.get_vm_status(instance_id)
@@ -334,16 +346,23 @@ class vSphere:
time_counter += 5
if time_counter >= timeout:
logging.info(f"VM {instance_id} is still not ready in allotted time")
return False
return True
exit_status = False
end_time = time.time()
if affected_node:
affected_node.set_affected_node_status("running", end_time - start_time)
def wait_until_stopped(self, instance_id, timeout):
return exit_status
def wait_until_stopped(self, instance_id, timeout, affected_node):
"""
Waits until the VM switches to POWERED_OFF state or until the timeout.
Returns True if the VM switches to POWERED_OFF, else returns False
"""
time_counter = 0
start_time = time.time()
exit_status = True
status = self.get_vm_status(instance_id)
while status != Power.State.POWERED_OFF:
status = self.get_vm_status(instance_id)
@@ -354,322 +373,106 @@ class vSphere:
time_counter += 5
if time_counter >= timeout:
logging.info(f"VM {instance_id} is still not ready in allotted time")
return False
return True
exit_status = False
end_time = time.time()
if affected_node:
affected_node.set_affected_node_status("stopped", end_time - start_time)
return exit_status
@dataclass
class Node:
name: str
class vmware_node_scenarios(abstract_node_scenarios):
def __init__(self, kubecli: KrknKubernetes, affected_nodes_status: AffectedNodeStatus):
super().__init__(kubecli, affected_nodes_status)
self.vsphere = vSphere()
def node_start_scenario(self, instance_kill_count, node, timeout):
try:
for _ in range(instance_kill_count):
affected_node = AffectedNode(node)
logging.info("Starting node_start_scenario injection")
logging.info(f"Starting the node {node} ")
vm_started = self.vsphere.start_instances(node)
if vm_started:
self.vsphere.wait_until_running(node, timeout, affected_node)
nodeaction.wait_for_ready_status(node, timeout, self.kubecli, affected_node)
logging.info(f"Node with instance ID: {node} is in running state")
logging.info("node_start_scenario has been successfully injected!")
self.affected_nodes_status.affected_nodes.append(affected_node)
except Exception as e:
logging.error("Failed to start node instance. Test Failed")
logging.error(
f"node_start_scenario injection failed! " f"Error was: {str(e)}"
)
@dataclass
class NodeScenarioSuccessOutput:
nodes: typing.Dict[int, Node] = field(
metadata={
"name": "Nodes started/stopped/terminated/rebooted",
"description": "Map between timestamps and the pods "
"started/stopped/terminated/rebooted. "
"The timestamp is provided in nanoseconds",
}
)
action: kube_helper.Actions = field(
metadata={
"name": "The action performed on the node",
"description": "The action performed or attempted to be "
"performed on the node. Possible values"
"are : Start, Stop, Terminate, Reboot",
}
)
@dataclass
class NodeScenarioErrorOutput:
error: str
action: kube_helper.Actions = field(
metadata={
"name": "The action performed on the node",
"description": "The action attempted to be performed on the node. "
"Possible values are : Start Stop, Terminate, Reboot",
}
)
@dataclass
class NodeScenarioConfig:
name: typing.Annotated[
typing.Optional[str],
validation.required_if_not("label_selector"),
validation.required_if("skip_openshift_checks"),
] = field(
default=None,
metadata={
"name": "Name",
"description": "Name(s) for target nodes. "
"Required if label_selector is not set.",
},
)
runs: typing.Annotated[typing.Optional[int], validation.min(1)] = field(
default=1,
metadata={
"name": "Number of runs per node",
"description": "Number of times to inject each scenario under "
"actions (will perform on same node each time)",
},
)
label_selector: typing.Annotated[
typing.Optional[str], validation.min(1), validation.required_if_not("name")
] = field(
default=None,
metadata={
"name": "Label selector",
"description": "Kubernetes label selector for the target nodes. "
"Required if name is not set.\n"
"See https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ " # noqa
"for details.",
},
)
timeout: typing.Annotated[typing.Optional[int], validation.min(1)] = field(
default=180,
metadata={
"name": "Timeout",
"description": "Timeout to wait for the target pod(s) "
"to be removed in seconds.",
},
)
instance_count: typing.Annotated[typing.Optional[int], validation.min(1)] = field(
default=1,
metadata={
"name": "Instance Count",
"description": "Number of nodes to perform action/select "
"that match the label selector.",
},
)
skip_openshift_checks: typing.Optional[bool] = field(
default=False,
metadata={
"name": "Skip Openshift Checks",
"description": "Skip checking the status of the openshift nodes.",
},
)
verify_session: bool = field(
default=True,
metadata={
"name": "Verify API Session",
"description": "Verifies the vSphere client session. "
"It is enabled by default",
},
)
kubeconfig_path: typing.Optional[str] = field(
default=None,
metadata={
"name": "Kubeconfig path",
"description": "Path to your Kubeconfig file. "
"Defaults to ~/.kube/config.\n"
"See https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/ " # noqa
"for details.",
},
)
@plugin.step(
id="vmware-node-start",
name="Start the node",
description="Start the node(s) by starting the VMware VM "
"on which the node is configured",
outputs={"success": NodeScenarioSuccessOutput, "error": NodeScenarioErrorOutput},
)
def node_start(
cfg: NodeScenarioConfig,
) -> typing.Tuple[
str, typing.Union[NodeScenarioSuccessOutput, NodeScenarioErrorOutput]
]:
with kube_helper.setup_kubernetes(None) as cli:
vsphere = vSphere(verify=cfg.verify_session)
core_v1 = client.CoreV1Api(cli)
watch_resource = watch.Watch()
node_list = kube_helper.get_node_list(cfg, kube_helper.Actions.START, core_v1)
nodes_started = {}
for name in node_list:
try:
for _ in range(cfg.runs):
logging.info("Starting node_start_scenario injection")
logging.info(f"Starting the node {name} ")
vm_started = vsphere.start_instances(name)
if vm_started:
vsphere.wait_until_running(name, cfg.timeout)
if not cfg.skip_openshift_checks:
kube_helper.wait_for_ready_status(
name, cfg.timeout, watch_resource, core_v1
)
nodes_started[int(time.time_ns())] = Node(name=name)
logging.info(f"Node with instance ID: {name} is in running state")
logging.info("node_start_scenario has been successfully injected!")
except Exception as e:
logging.error("Failed to start node instance. Test Failed")
logging.error(
f"node_start_scenario injection failed! " f"Error was: {str(e)}"
)
return "error", NodeScenarioErrorOutput(
format_exc(), kube_helper.Actions.START
)
return "success", NodeScenarioSuccessOutput(
nodes_started, kube_helper.Actions.START
)
@plugin.step(
id="vmware-node-stop",
name="Stop the node",
description="Stop the node(s) by starting the VMware VM "
"on which the node is configured",
outputs={"success": NodeScenarioSuccessOutput, "error": NodeScenarioErrorOutput},
)
def node_stop(
cfg: NodeScenarioConfig,
) -> typing.Tuple[
str, typing.Union[NodeScenarioSuccessOutput, NodeScenarioErrorOutput]
]:
with kube_helper.setup_kubernetes(None) as cli:
vsphere = vSphere(verify=cfg.verify_session)
core_v1 = client.CoreV1Api(cli)
watch_resource = watch.Watch()
node_list = kube_helper.get_node_list(cfg, kube_helper.Actions.STOP, core_v1)
nodes_stopped = {}
for name in node_list:
try:
for _ in range(cfg.runs):
logging.info("Starting node_stop_scenario injection")
logging.info(f"Stopping the node {name} ")
vm_stopped = vsphere.stop_instances(name)
if vm_stopped:
vsphere.wait_until_stopped(name, cfg.timeout)
if not cfg.skip_openshift_checks:
kube_helper.wait_for_ready_status(
name, cfg.timeout, watch_resource, core_v1
)
nodes_stopped[int(time.time_ns())] = Node(name=name)
logging.info(f"Node with instance ID: {name} is in stopped state")
logging.info("node_stop_scenario has been successfully injected!")
except Exception as e:
logging.error("Failed to stop node instance. Test Failed")
logging.error(
f"node_stop_scenario injection failed! " f"Error was: {str(e)}"
)
return "error", NodeScenarioErrorOutput(
format_exc(), kube_helper.Actions.STOP
)
return "success", NodeScenarioSuccessOutput(
nodes_stopped, kube_helper.Actions.STOP
)
@plugin.step(
id="vmware-node-reboot",
name="Reboot VMware VM",
description="Reboot the node(s) by starting the VMware VM "
"on which the node is configured",
outputs={"success": NodeScenarioSuccessOutput, "error": NodeScenarioErrorOutput},
)
def node_reboot(
cfg: NodeScenarioConfig,
) -> typing.Tuple[
str, typing.Union[NodeScenarioSuccessOutput, NodeScenarioErrorOutput]
]:
with kube_helper.setup_kubernetes(None) as cli:
vsphere = vSphere(verify=cfg.verify_session)
core_v1 = client.CoreV1Api(cli)
watch_resource = watch.Watch()
node_list = kube_helper.get_node_list(cfg, kube_helper.Actions.REBOOT, core_v1)
nodes_rebooted = {}
for name in node_list:
try:
for _ in range(cfg.runs):
logging.info("Starting node_reboot_scenario injection")
logging.info(f"Rebooting the node {name} ")
vsphere.reboot_instances(name)
if not cfg.skip_openshift_checks:
kube_helper.wait_for_unknown_status(
name, cfg.timeout, watch_resource, core_v1
)
kube_helper.wait_for_ready_status(
name, cfg.timeout, watch_resource, core_v1
)
nodes_rebooted[int(time.time_ns())] = Node(name=name)
logging.info(
f"Node with instance ID: {name} has rebooted " "successfully"
def node_stop_scenario(self, instance_kill_count, node, timeout):
try:
for _ in range(instance_kill_count):
affected_node = AffectedNode(node)
logging.info("Starting node_stop_scenario injection")
logging.info(f"Stopping the node {node} ")
vm_stopped = self.vsphere.stop_instances(node)
if vm_stopped:
self.vsphere.wait_until_stopped(node, timeout, affected_node)
nodeaction.wait_for_ready_status(
node, timeout, self.kubecli, affected_node
)
logging.info("node_reboot_scenario has been successfully injected!")
except Exception as e:
logging.error("Failed to reboot node instance. Test Failed")
logging.error(
f"node_reboot_scenario injection failed! " f"Error was: {str(e)}"
)
return "error", NodeScenarioErrorOutput(
format_exc(), kube_helper.Actions.REBOOT
)
logging.info(f"Node with instance ID: {node} is in stopped state")
logging.info("node_stop_scenario has been successfully injected!")
self.affected_nodes_status.affected_nodes.append(affected_node)
except Exception as e:
logging.error("Failed to stop node instance. Test Failed")
logging.error(
f"node_stop_scenario injection failed! " f"Error was: {str(e)}"
)
return "success", NodeScenarioSuccessOutput(
nodes_rebooted, kube_helper.Actions.REBOOT
)
def node_reboot_scenario(self, instance_kill_count, node, timeout):
try:
for _ in range(instance_kill_count):
affected_node = AffectedNode(node)
logging.info("Starting node_reboot_scenario injection")
logging.info(f"Rebooting the node {node} ")
self.vsphere.reboot_instances(node)
nodeaction.wait_for_unknown_status(
node, timeout, self.kubecli, affected_node
)
logging.info(
f"Node with instance ID: {node} has rebooted " "successfully"
)
logging.info("node_reboot_scenario has been successfully injected!")
self.affected_nodes_status.affected_nodes.append(affected_node)
except Exception as e:
logging.error("Failed to reboot node instance. Test Failed")
logging.error(
f"node_reboot_scenario injection failed! " f"Error was: {str(e)}"
)
@plugin.step(
id="vmware-node-terminate",
name="Reboot VMware VM",
description="Wait for the node to be terminated",
outputs={"success": NodeScenarioSuccessOutput, "error": NodeScenarioErrorOutput},
)
def node_terminate(
cfg: NodeScenarioConfig,
) -> typing.Tuple[
str, typing.Union[NodeScenarioSuccessOutput, NodeScenarioErrorOutput]
]:
with kube_helper.setup_kubernetes(None) as cli:
vsphere = vSphere(verify=cfg.verify_session)
core_v1 = client.CoreV1Api(cli)
node_list = kube_helper.get_node_list(
cfg, kube_helper.Actions.TERMINATE, core_v1
)
nodes_terminated = {}
for name in node_list:
try:
for _ in range(cfg.runs):
logging.info(
"Starting node_termination_scenario injection "
"by first stopping the node"
)
vsphere.stop_instances(name)
vsphere.wait_until_stopped(name, cfg.timeout)
logging.info(f"Releasing the node with instance ID: {name} ")
vsphere.release_instances(name)
vsphere.wait_until_released(name, cfg.timeout)
nodes_terminated[int(time.time_ns())] = Node(name=name)
logging.info(f"Node with instance ID: {name} has been released")
logging.info(
"node_terminate_scenario has been " "successfully injected!"
)
except Exception as e:
logging.error("Failed to terminate node instance. Test Failed")
logging.error(
f"node_terminate_scenario injection failed! " f"Error was: {str(e)}"
def node_terminate_scenario(self, instance_kill_count, node, timeout):
try:
for _ in range(instance_kill_count):
affected_node = AffectedNode(node)
logging.info(
"Starting node_termination_scenario injection "
"by first stopping the node"
)
return "error", NodeScenarioErrorOutput(
format_exc(), kube_helper.Actions.TERMINATE
self.vsphere.stop_instances(node)
self.vsphere.wait_until_stopped(node, timeout, affected_node)
logging.info(f"Releasing the node with instance ID: {node} ")
self.vsphere.release_instances(node)
self.vsphere.wait_until_released(node, timeout, affected_node)
logging.info(f"Node with instance ID: {node} has been released")
logging.info(
"node_terminate_scenario has been " "successfully injected!"
)
return "success", NodeScenarioSuccessOutput(
nodes_terminated, kube_helper.Actions.TERMINATE
)
self.affected_nodes_status.affected_nodes.append(affected_node)
except Exception as e:
logging.error("Failed to terminate node instance. Test Failed")
logging.error(
f"node_terminate_scenario injection failed! " f"Error was: {str(e)}"
)

View File

@@ -1,10 +1,17 @@
# yaml-language-server: $schema=../plugin.schema.json
- id: <vmware-node-stop/vmware-node-start/vmware-node-reboot/vmware-node-terminate>
config:
name: <node_name> # Node on which scenario has to be injected; can set multiple names separated by comma
label_selector: <label_selector> # When node_name is not specified, a node with matching label_selector is selected for node chaos scenario injection
runs: 1 # Number of times to inject each scenario under actions (will perform on same node each time)
instance_count: 1 # Number of nodes to perform action/select that match the label selector
timeout: 300 # Duration to wait for completion of node scenario injection
verify_session: True # Set to True if you want to verify the vSphere client session using certificates; else False
skip_openshift_checks: False # Set to True if you don't want to wait for the status of the nodes to change on OpenShift before passing the scenario
node_scenarios:
- actions:
- node_reboot_scenario
node_name:
label_selector: node-role.kubernetes.io/worker
instance_count: 1
timeout: 120
cloud_type: vmware
- actions:
- node_stop_start_scenario
node_name:
label_selector: node-role.kubernetes.io/worker
instance_count: 1
timeout: 360
duration: 10
cloud_type: vmware
parallel: false

View File

@@ -1,121 +0,0 @@
import unittest
import os
import logging
from arcaflow_plugin_sdk import plugin
from krkn.scenario_plugins.native.node_scenarios import vmware_plugin
from krkn.scenario_plugins.native.node_scenarios.kubernetes_functions import Actions
class NodeScenariosTest(unittest.TestCase):
def setUp(self):
vsphere_env_vars = ["VSPHERE_IP", "VSPHERE_USERNAME", "VSPHERE_PASSWORD"]
self.credentials_present = all(
env_var in os.environ for env_var in vsphere_env_vars
)
def test_serialization(self):
plugin.test_object_serialization(
vmware_plugin.NodeScenarioConfig(name="test", skip_openshift_checks=True),
self.fail,
)
plugin.test_object_serialization(
vmware_plugin.NodeScenarioSuccessOutput(nodes={}, action=Actions.START),
self.fail,
)
plugin.test_object_serialization(
vmware_plugin.NodeScenarioErrorOutput(
error="Hello World", action=Actions.START
),
self.fail,
)
def test_node_start(self):
if not self.credentials_present:
self.skipTest(
"Check if the environmental variables 'VSPHERE_IP', "
"'VSPHERE_USERNAME', 'VSPHERE_PASSWORD' are set"
)
vsphere = vmware_plugin.vSphere(verify=False)
vm_id, vm_name = vsphere.create_default_vm()
if vm_id is None:
self.fail("Could not create test VM")
output_id, output_data = vmware_plugin.node_start(
vmware_plugin.NodeScenarioConfig(
name=vm_name, skip_openshift_checks=True, verify_session=False
)
)
if output_id == "error":
logging.error(output_data.error)
self.fail("The VMware VM did not start because an error occurred")
vsphere.release_instances(vm_name)
def test_node_stop(self):
if not self.credentials_present:
self.skipTest(
"Check if the environmental variables 'VSPHERE_IP', "
"'VSPHERE_USERNAME', 'VSPHERE_PASSWORD' are set"
)
vsphere = vmware_plugin.vSphere(verify=False)
vm_id, vm_name = vsphere.create_default_vm()
if vm_id is None:
self.fail("Could not create test VM")
vsphere.start_instances(vm_name)
output_id, output_data = vmware_plugin.node_stop(
vmware_plugin.NodeScenarioConfig(
name=vm_name, skip_openshift_checks=True, verify_session=False
)
)
if output_id == "error":
logging.error(output_data.error)
self.fail("The VMware VM did not stop because an error occurred")
vsphere.release_instances(vm_name)
def test_node_reboot(self):
if not self.credentials_present:
self.skipTest(
"Check if the environmental variables 'VSPHERE_IP', "
"'VSPHERE_USERNAME', 'VSPHERE_PASSWORD' are set"
)
vsphere = vmware_plugin.vSphere(verify=False)
vm_id, vm_name = vsphere.create_default_vm()
if vm_id is None:
self.fail("Could not create test VM")
vsphere.start_instances(vm_name)
output_id, output_data = vmware_plugin.node_reboot(
vmware_plugin.NodeScenarioConfig(
name=vm_name, skip_openshift_checks=True, verify_session=False
)
)
if output_id == "error":
logging.error(output_data.error)
self.fail("The VMware VM did not reboot because an error occurred")
vsphere.release_instances(vm_name)
def test_node_terminate(self):
if not self.credentials_present:
self.skipTest(
"Check if the environmental variables 'VSPHERE_IP', "
"'VSPHERE_USERNAME', 'VSPHERE_PASSWORD' are set"
)
vsphere = vmware_plugin.vSphere(verify=False)
vm_id, vm_name = vsphere.create_default_vm()
if vm_id is None:
self.fail("Could not create test VM")
vsphere.start_instances(vm_name)
output_id, output_data = vmware_plugin.node_terminate(
vmware_plugin.NodeScenarioConfig(
name=vm_name, skip_openshift_checks=True, verify_session=False
)
)
if output_id == "error":
logging.error(output_data.error)
self.fail("The VMware VM did not reboot because an error occurred")
if __name__ == "__main__":
unittest.main()