mirror of
https://github.com/krkn-chaos/krkn.git
synced 2026-02-14 18:10:00 +00:00
Refactor code base
This commit: - Refactors the code base to be more modular by moving functions into respective modules to make it lean and reusable. - Uses black to reformat the code to follow PEP 8 practices.
This commit is contained in:
@@ -47,6 +47,8 @@ We are always looking for more enhancements, fixes to make it better, any contri
|
||||
[More information on how to Contribute](docs/contribute.md)
|
||||
|
||||
### Community
|
||||
Key Members(slack_usernames): paigerube14, rook, mffiedler, mohit, dry923, rsevilla, ravi
|
||||
Key Members(slack_usernames): paigerube14, rook, mffiedler, mohit, dry923, rsevilla, ravielluri
|
||||
* [**#sig-scalability on Kubernetes Slack**](https://kubernetes.slack.com)
|
||||
* [**#forum-perfscale on CoreOS Slack**](https://coreos.slack.com)
|
||||
* [**#sig-scalability on Kubernetes Slack**](https://kubernetes.slack.com)
|
||||
* [**#forum-chaos on CoreOS Slack internal to Red Hat**](https://coreos.slack.com)
|
||||
* [**#forum-perfscale on CoreOS Slack internal to Red Hat**](https://coreos.slack.com)
|
||||
|
||||
0
kraken/cerberus/__init__.py
Normal file
0
kraken/cerberus/__init__.py
Normal file
48
kraken/cerberus/setup.py
Normal file
48
kraken/cerberus/setup.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import logging
|
||||
import requests
|
||||
import sys
|
||||
|
||||
|
||||
# Get cerberus status
|
||||
def get_status(config):
|
||||
cerberus_status = True
|
||||
if config["cerberus"]["cerberus_enabled"]:
|
||||
cerberus_url = config["cerberus"]["cerberus_url"]
|
||||
if not cerberus_url:
|
||||
logging.error("url where Cerberus publishes True/False signal is not provided.")
|
||||
sys.exit(1)
|
||||
cerberus_status = requests.get(cerberus_url).content
|
||||
cerberus_status = True if cerberus_status == b"True" else False
|
||||
if not cerberus_status:
|
||||
logging.error(
|
||||
"Received a no-go signal from Cerberus, looks like "
|
||||
"the cluster is unhealthy. Please check the Cerberus "
|
||||
"report for more details. Test failed."
|
||||
)
|
||||
sys.exit(1)
|
||||
else:
|
||||
logging.info("Received a go signal from Ceberus, the cluster is healthy. " "Test passed.")
|
||||
return cerberus_status
|
||||
|
||||
|
||||
# Function to publish kraken status to cerberus
|
||||
def publish_kraken_status(config, failed_post_scenarios):
|
||||
cerberus_status = get_status(config)
|
||||
if not cerberus_status:
|
||||
if failed_post_scenarios:
|
||||
if config["kraken"]["exit_on_failure"]:
|
||||
logging.info(
|
||||
"Cerberus status is not healthy and post action scenarios " "are still failing, exiting kraken run"
|
||||
)
|
||||
sys.exit(1)
|
||||
else:
|
||||
logging.info("Cerberus status is not healthy and post action scenarios " "are still failing")
|
||||
else:
|
||||
if failed_post_scenarios:
|
||||
if config["kraken"]["exit_on_failure"]:
|
||||
logging.info(
|
||||
"Cerberus status is healthy but post action scenarios " "are still failing, exiting kraken run"
|
||||
)
|
||||
sys.exit(1)
|
||||
else:
|
||||
logging.info("Cerberus status is healthy but post action scenarios " "are still failing")
|
||||
@@ -2,6 +2,52 @@ import kraken.invoke.command as runcommand
|
||||
import logging
|
||||
import time
|
||||
import sys
|
||||
import requests
|
||||
import yaml
|
||||
import kraken.cerberus.setup as cerberus
|
||||
|
||||
|
||||
# Inject litmus scenarios defined in the config
|
||||
def run(scenarios_list, config, litmus_namespaces, litmus_uninstall, wait_duration):
|
||||
# Loop to run the scenarios starts here
|
||||
for l_scenario in scenarios_list:
|
||||
try:
|
||||
for item in l_scenario:
|
||||
runcommand.invoke("kubectl apply -f %s" % item)
|
||||
if "http" in item:
|
||||
f = requests.get(item)
|
||||
yaml_item = list(yaml.safe_load_all(f.content))[0]
|
||||
else:
|
||||
with open(item, "r") as f:
|
||||
logging.info("opened yaml" + str(item))
|
||||
yaml_item = list(yaml.safe_load_all(f))[0]
|
||||
|
||||
if yaml_item["kind"] == "ChaosEngine":
|
||||
engine_name = yaml_item["metadata"]["name"]
|
||||
namespace = yaml_item["metadata"]["namespace"]
|
||||
litmus_namespaces.append(namespace)
|
||||
experiment_names = yaml_item["spec"]["experiments"]
|
||||
for expr in experiment_names:
|
||||
expr_name = expr["name"]
|
||||
experiment_result = check_experiment(engine_name, expr_name, namespace)
|
||||
if experiment_result:
|
||||
logging.info("Scenario: %s has been successfully injected!" % item)
|
||||
else:
|
||||
logging.info("Scenario: %s was not successfully injected!" % item)
|
||||
if litmus_uninstall:
|
||||
for l_item in l_scenario:
|
||||
logging.info("item " + str(l_item))
|
||||
runcommand.invoke("kubectl delete -f %s" % l_item)
|
||||
if litmus_uninstall:
|
||||
for item in l_scenario:
|
||||
logging.info("item " + str(item))
|
||||
runcommand.invoke("kubectl delete -f %s" % item)
|
||||
logging.info("Waiting for the specified duration: %s" % wait_duration)
|
||||
time.sleep(wait_duration)
|
||||
cerberus.get_status(config)
|
||||
except Exception as e:
|
||||
logging.error("Failed to run litmus scenario: %s. Encountered " "the following exception: %s" % (item, e))
|
||||
return litmus_namespaces
|
||||
|
||||
|
||||
# Install litmus and wait until pod is running
|
||||
|
||||
@@ -2,8 +2,20 @@ import time
|
||||
import random
|
||||
import logging
|
||||
import paramiko
|
||||
import yaml
|
||||
import sys
|
||||
import kraken.kubernetes.client as kubecli
|
||||
import kraken.invoke.command as runcommand
|
||||
import kraken.cerberus.setup as cerberus
|
||||
import kraken.node_actions.common_node_functions as nodeaction
|
||||
from kraken.node_actions.aws_node_scenarios import aws_node_scenarios
|
||||
from kraken.node_actions.general_cloud_node_scenarios import general_node_scenarios
|
||||
from kraken.node_actions.az_node_scenarios import azure_node_scenarios
|
||||
from kraken.node_actions.gcp_node_scenarios import gcp_node_scenarios
|
||||
from kraken.node_actions.openstack_node_scenarios import openstack_node_scenarios
|
||||
|
||||
|
||||
node_general = False
|
||||
|
||||
|
||||
# Pick a random node with specified label selector
|
||||
@@ -70,3 +82,94 @@ def check_service_status(node, service, ssh_private_key, timeout):
|
||||
if service_status.strip() != "active":
|
||||
logging.error("Service %s is in %s state" % (service_name, service_status.strip()))
|
||||
ssh.close()
|
||||
|
||||
|
||||
# Run defined scenarios
|
||||
def run(scenarios_list, config, wait_duration):
|
||||
for node_scenario_config in scenarios_list:
|
||||
with open(node_scenario_config, "r") as f:
|
||||
node_scenario_config = yaml.full_load(f)
|
||||
for node_scenario in node_scenario_config["node_scenarios"]:
|
||||
node_scenario_object = get_node_scenario_object(node_scenario)
|
||||
if node_scenario["actions"]:
|
||||
for action in node_scenario["actions"]:
|
||||
inject_node_scenario(action, node_scenario, node_scenario_object)
|
||||
logging.info("Waiting for the specified duration: %s" % (wait_duration))
|
||||
time.sleep(wait_duration)
|
||||
cerberus.get_status(config)
|
||||
logging.info("")
|
||||
|
||||
|
||||
# Inject the specified node scenario
|
||||
def inject_node_scenario(action, node_scenario, node_scenario_object):
|
||||
generic_cloud_scenarios = ("stop_kubelet_scenario", "node_crash_scenario")
|
||||
# Get the node scenario configurations
|
||||
instance_kill_count = node_scenario.get("instance_kill_count", 1)
|
||||
node_name = node_scenario.get("node_name", "")
|
||||
label_selector = node_scenario.get("label_selector", "")
|
||||
timeout = node_scenario.get("timeout", 120)
|
||||
service = node_scenario.get("service", "")
|
||||
ssh_private_key = node_scenario.get("ssh_private_key", "~/.ssh/id_rsa")
|
||||
# Get the node to apply the scenario
|
||||
node = nodeaction.get_node(node_name, label_selector)
|
||||
|
||||
if node_general and action not in generic_cloud_scenarios:
|
||||
logging.info("Scenario: " + action + " is not set up for generic cloud type, skipping action")
|
||||
else:
|
||||
if action == "node_start_scenario":
|
||||
node_scenario_object.node_start_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "node_stop_scenario":
|
||||
node_scenario_object.node_stop_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "node_stop_start_scenario":
|
||||
node_scenario_object.node_stop_start_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "node_termination_scenario":
|
||||
node_scenario_object.node_termination_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "node_reboot_scenario":
|
||||
node_scenario_object.node_reboot_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "stop_start_kubelet_scenario":
|
||||
node_scenario_object.stop_start_kubelet_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "stop_kubelet_scenario":
|
||||
node_scenario_object.stop_kubelet_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "node_crash_scenario":
|
||||
node_scenario_object.node_crash_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "stop_start_helper_node_scenario":
|
||||
if node_scenario["cloud_type"] != "openstack":
|
||||
logging.error(
|
||||
"Scenario: " + action + " is not supported for "
|
||||
"cloud type " + node_scenario["cloud_type"] + ", skipping action"
|
||||
)
|
||||
else:
|
||||
if not node_scenario["helper_node_ip"]:
|
||||
logging.error("Helper node IP address is not provided")
|
||||
sys.exit(1)
|
||||
node_scenario_object.helper_node_stop_start_scenario(
|
||||
instance_kill_count, node_scenario["helper_node_ip"], timeout
|
||||
)
|
||||
node_scenario_object.helper_node_service_status(
|
||||
node_scenario["helper_node_ip"], service, ssh_private_key, timeout
|
||||
)
|
||||
else:
|
||||
logging.info("There is no node action that matches %s, skipping scenario" % action)
|
||||
|
||||
|
||||
# Get the node scenarios object of specfied cloud type
|
||||
def get_node_scenario_object(node_scenario):
|
||||
if "cloud_type" not in node_scenario.keys() or node_scenario["cloud_type"] == "generic":
|
||||
global node_general
|
||||
node_general = True
|
||||
return general_node_scenarios()
|
||||
if node_scenario["cloud_type"] == "aws":
|
||||
return aws_node_scenarios()
|
||||
elif node_scenario["cloud_type"] == "gcp":
|
||||
return gcp_node_scenarios()
|
||||
elif node_scenario["cloud_type"] == "openstack":
|
||||
return openstack_node_scenarios()
|
||||
elif node_scenario["cloud_type"] == "azure" or node_scenario["cloud_type"] == "az":
|
||||
return azure_node_scenarios()
|
||||
else:
|
||||
logging.error(
|
||||
"Cloud type " + node_scenario["cloud_type"] + " is not currently supported; "
|
||||
"try using 'generic' if wanting to stop/start kubelet or fork bomb on any "
|
||||
"cluster"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
0
kraken/pod_scenarios/__init__.py
Normal file
0
kraken/pod_scenarios/__init__.py
Normal file
36
kraken/pod_scenarios/setup.py
Normal file
36
kraken/pod_scenarios/setup.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import logging
|
||||
import kraken.invoke.command as runcommand
|
||||
import kraken.cerberus.setup as cerberus
|
||||
import kraken.post_actions.actions as post_actions
|
||||
import time
|
||||
|
||||
|
||||
# Run pod based scenarios
|
||||
def run(kubeconfig_path, scenarios_list, config, failed_post_scenarios, wait_duration):
|
||||
try:
|
||||
# Loop to run the scenarios starts here
|
||||
for pod_scenario in scenarios_list:
|
||||
if len(pod_scenario) > 1:
|
||||
pre_action_output = post_actions.run(kubeconfig_path, pod_scenario[1])
|
||||
else:
|
||||
pre_action_output = ""
|
||||
scenario_logs = runcommand.invoke(
|
||||
"powerfulseal autonomous --use-pod-delete-instead-"
|
||||
"of-ssh-kill --policy-file %s --kubeconfig %s "
|
||||
"--no-cloud --inventory-kubernetes --headless" % (pod_scenario[0], kubeconfig_path)
|
||||
)
|
||||
|
||||
# Display pod scenario logs/actions
|
||||
print(scenario_logs)
|
||||
|
||||
logging.info("Scenario: %s has been successfully injected!" % (pod_scenario[0]))
|
||||
logging.info("Waiting for the specified duration: %s" % (wait_duration))
|
||||
time.sleep(wait_duration)
|
||||
|
||||
failed_post_scenarios = post_actions.check_recovery(
|
||||
kubeconfig_path, pod_scenario, failed_post_scenarios, pre_action_output
|
||||
)
|
||||
cerberus.publish_kraken_status(config, failed_post_scenarios)
|
||||
except Exception as e:
|
||||
logging.error("Failed to run scenario: %s. Encountered the following " "exception: %s" % (pod_scenario[0], e))
|
||||
return failed_post_scenarios
|
||||
0
kraken/post_actions/__init__.py
Normal file
0
kraken/post_actions/__init__.py
Normal file
60
kraken/post_actions/actions.py
Normal file
60
kraken/post_actions/actions.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import logging
|
||||
import kraken.invoke.command as runcommand
|
||||
|
||||
|
||||
def run(kubeconfig_path, scenario, pre_action_output=""):
|
||||
|
||||
if scenario.endswith(".yaml") or scenario.endswith(".yml"):
|
||||
action_output = runcommand.invoke(
|
||||
"powerfulseal autonomous "
|
||||
"--use-pod-delete-instead-of-ssh-kill"
|
||||
" --policy-file %s --kubeconfig %s --no-cloud"
|
||||
" --inventory-kubernetes --headless" % (scenario, kubeconfig_path)
|
||||
)
|
||||
# read output to make sure no error
|
||||
if "ERROR" in action_output:
|
||||
action_output.split("ERROR")[1].split("\n")[0]
|
||||
if not pre_action_output:
|
||||
logging.info("Powerful seal pre action check failed for " + str(scenario))
|
||||
return False
|
||||
else:
|
||||
logging.info(scenario + " post action checks passed")
|
||||
|
||||
elif scenario.endswith(".py"):
|
||||
action_output = runcommand.invoke("python3 " + scenario).strip()
|
||||
if pre_action_output:
|
||||
if pre_action_output == action_output:
|
||||
logging.info(scenario + " post action checks passed")
|
||||
else:
|
||||
logging.info(scenario + " post action response did not match pre check output")
|
||||
return False
|
||||
elif scenario != "":
|
||||
# invoke custom bash script
|
||||
action_output = runcommand.invoke(scenario).strip()
|
||||
if pre_action_output:
|
||||
if pre_action_output == action_output:
|
||||
logging.info(scenario + " post action checks passed")
|
||||
else:
|
||||
logging.info(scenario + " post action response did not match pre check output")
|
||||
return False
|
||||
|
||||
return action_output
|
||||
|
||||
|
||||
# Perform the post scenario actions to see if components recovered
|
||||
def check_recovery(kubeconfig_path, scenario, failed_post_scenarios, pre_action_output):
|
||||
|
||||
for failed_scenario in failed_post_scenarios:
|
||||
post_action_output = run(kubeconfig_path, failed_scenario[0], failed_scenario[1])
|
||||
if post_action_output is not False:
|
||||
failed_post_scenarios.remove(failed_scenario)
|
||||
else:
|
||||
logging.info("Post action scenario " + str(failed_scenario) + "is still failing")
|
||||
|
||||
# check post actions
|
||||
if len(scenario) > 1:
|
||||
post_action_output = run(kubeconfig_path, scenario[1], pre_action_output)
|
||||
if post_action_output is False:
|
||||
failed_post_scenarios.append([scenario[1], pre_action_output])
|
||||
|
||||
return failed_post_scenarios
|
||||
@@ -5,6 +5,8 @@ import kraken.invoke.command as runcommand
|
||||
import kraken.kubernetes.client as kubecli
|
||||
import re
|
||||
import sys
|
||||
import kraken.cerberus.setup as cerberus
|
||||
import yaml
|
||||
|
||||
|
||||
def pod_exec(pod_name, command, namespace):
|
||||
@@ -134,3 +136,17 @@ def check_date_time(object_type, names):
|
||||
if counter < max_retries:
|
||||
logging.info("Date in pod " + str(pod_name[0]) + " reset properly")
|
||||
return not_reset
|
||||
|
||||
|
||||
def run(scenarios_list, config, wait_duration):
|
||||
for time_scenario_config in scenarios_list:
|
||||
with open(time_scenario_config, "r") as f:
|
||||
scenario_config = yaml.full_load(f)
|
||||
for time_scenario in scenario_config["time_scenarios"]:
|
||||
object_type, object_names = skew_time(time_scenario)
|
||||
not_reset = check_date_time(object_type, object_names)
|
||||
if len(not_reset) > 0:
|
||||
logging.info("Object times were not reset")
|
||||
logging.info("Waiting for the specified duration: %s" % (wait_duration))
|
||||
time.sleep(wait_duration)
|
||||
cerberus.publish_kraken_status(config, not_reset)
|
||||
|
||||
306
run_kraken.py
306
run_kraken.py
@@ -2,304 +2,17 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import yaml
|
||||
import logging
|
||||
import optparse
|
||||
import requests
|
||||
import pyfiglet
|
||||
import kraken.kubernetes.client as kubecli
|
||||
import kraken.invoke.command as runcommand
|
||||
import kraken.litmus.common_litmus as common_litmus
|
||||
import kraken.node_actions.common_node_functions as nodeaction
|
||||
from kraken.node_actions.aws_node_scenarios import aws_node_scenarios
|
||||
from kraken.node_actions.general_cloud_node_scenarios import general_node_scenarios
|
||||
from kraken.node_actions.az_node_scenarios import azure_node_scenarios
|
||||
from kraken.node_actions.gcp_node_scenarios import gcp_node_scenarios
|
||||
from kraken.node_actions.openstack_node_scenarios import openstack_node_scenarios
|
||||
import kraken.time_actions.common_time_functions as time_actions
|
||||
import kraken.performance_dashboards.setup as performance_dashboards
|
||||
|
||||
node_general = False
|
||||
|
||||
|
||||
# Get the node scenarios object of specfied cloud type
|
||||
def get_node_scenario_object(node_scenario):
|
||||
if "cloud_type" not in node_scenario.keys() or node_scenario["cloud_type"] == "generic":
|
||||
global node_general
|
||||
node_general = True
|
||||
return general_node_scenarios()
|
||||
if node_scenario["cloud_type"] == "aws":
|
||||
return aws_node_scenarios()
|
||||
elif node_scenario["cloud_type"] == "gcp":
|
||||
return gcp_node_scenarios()
|
||||
elif node_scenario["cloud_type"] == "openstack":
|
||||
return openstack_node_scenarios()
|
||||
elif node_scenario["cloud_type"] == "azure" or node_scenario["cloud_type"] == "az":
|
||||
return azure_node_scenarios()
|
||||
else:
|
||||
logging.error(
|
||||
"Cloud type " + node_scenario["cloud_type"] + " is not currently supported; "
|
||||
"try using 'generic' if wanting to stop/start kubelet or fork bomb on any "
|
||||
"cluster"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# Inject the specified node scenario
|
||||
def inject_node_scenario(action, node_scenario, node_scenario_object):
|
||||
generic_cloud_scenarios = ("stop_kubelet_scenario", "node_crash_scenario")
|
||||
# Get the node scenario configurations
|
||||
instance_kill_count = node_scenario.get("instance_kill_count", 1)
|
||||
node_name = node_scenario.get("node_name", "")
|
||||
label_selector = node_scenario.get("label_selector", "")
|
||||
timeout = node_scenario.get("timeout", 120)
|
||||
service = node_scenario.get("service", "")
|
||||
ssh_private_key = node_scenario.get("ssh_private_key", "~/.ssh/id_rsa")
|
||||
# Get the node to apply the scenario
|
||||
node = nodeaction.get_node(node_name, label_selector)
|
||||
|
||||
if node_general and action not in generic_cloud_scenarios:
|
||||
logging.info("Scenario: " + action + " is not set up for generic cloud type, skipping action")
|
||||
else:
|
||||
if action == "node_start_scenario":
|
||||
node_scenario_object.node_start_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "node_stop_scenario":
|
||||
node_scenario_object.node_stop_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "node_stop_start_scenario":
|
||||
node_scenario_object.node_stop_start_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "node_termination_scenario":
|
||||
node_scenario_object.node_termination_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "node_reboot_scenario":
|
||||
node_scenario_object.node_reboot_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "stop_start_kubelet_scenario":
|
||||
node_scenario_object.stop_start_kubelet_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "stop_kubelet_scenario":
|
||||
node_scenario_object.stop_kubelet_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "node_crash_scenario":
|
||||
node_scenario_object.node_crash_scenario(instance_kill_count, node, timeout)
|
||||
elif action == "stop_start_helper_node_scenario":
|
||||
if node_scenario["cloud_type"] != "openstack":
|
||||
logging.error(
|
||||
"Scenario: " + action + " is not supported for "
|
||||
"cloud type " + node_scenario["cloud_type"] + ", skipping action"
|
||||
)
|
||||
else:
|
||||
if not node_scenario["helper_node_ip"]:
|
||||
logging.error("Helper node IP address is not provided")
|
||||
sys.exit(1)
|
||||
node_scenario_object.helper_node_stop_start_scenario(
|
||||
instance_kill_count, node_scenario["helper_node_ip"], timeout
|
||||
)
|
||||
node_scenario_object.helper_node_service_status(
|
||||
node_scenario["helper_node_ip"], service, ssh_private_key, timeout
|
||||
)
|
||||
else:
|
||||
logging.info("There is no node action that matches %s, skipping scenario" % action)
|
||||
|
||||
|
||||
# Get cerberus status
|
||||
def cerberus_integration(config):
|
||||
cerberus_status = True
|
||||
if config["cerberus"]["cerberus_enabled"]:
|
||||
cerberus_url = config["cerberus"]["cerberus_url"]
|
||||
if not cerberus_url:
|
||||
logging.error("url where Cerberus publishes True/False signal is not provided.")
|
||||
sys.exit(1)
|
||||
cerberus_status = requests.get(cerberus_url).content
|
||||
cerberus_status = True if cerberus_status == b"True" else False
|
||||
if not cerberus_status:
|
||||
logging.error(
|
||||
"Received a no-go signal from Cerberus, looks like "
|
||||
"the cluster is unhealthy. Please check the Cerberus "
|
||||
"report for more details. Test failed."
|
||||
)
|
||||
sys.exit(1)
|
||||
else:
|
||||
logging.info("Received a go signal from Ceberus, the cluster is healthy. " "Test passed.")
|
||||
return cerberus_status
|
||||
|
||||
|
||||
# Function to publish kraken status to cerberus
|
||||
def publish_kraken_status(config, failed_post_scenarios):
|
||||
cerberus_status = cerberus_integration(config)
|
||||
if not cerberus_status:
|
||||
if failed_post_scenarios:
|
||||
if config["kraken"]["exit_on_failure"]:
|
||||
logging.info(
|
||||
"Cerberus status is not healthy and post action scenarios " "are still failing, exiting kraken run"
|
||||
)
|
||||
sys.exit(1)
|
||||
else:
|
||||
logging.info("Cerberus status is not healthy and post action scenarios " "are still failing")
|
||||
else:
|
||||
if failed_post_scenarios:
|
||||
if config["kraken"]["exit_on_failure"]:
|
||||
logging.info(
|
||||
"Cerberus status is healthy but post action scenarios " "are still failing, exiting kraken run"
|
||||
)
|
||||
sys.exit(1)
|
||||
else:
|
||||
logging.info("Cerberus status is healthy but post action scenarios " "are still failing")
|
||||
|
||||
|
||||
def run_post_action(kubeconfig_path, scenario, pre_action_output=""):
|
||||
|
||||
if scenario.endswith(".yaml") or scenario.endswith(".yml"):
|
||||
action_output = runcommand.invoke(
|
||||
"powerfulseal autonomous "
|
||||
"--use-pod-delete-instead-of-ssh-kill"
|
||||
" --policy-file %s --kubeconfig %s --no-cloud"
|
||||
" --inventory-kubernetes --headless" % (scenario, kubeconfig_path)
|
||||
)
|
||||
# read output to make sure no error
|
||||
if "ERROR" in action_output:
|
||||
action_output.split("ERROR")[1].split("\n")[0]
|
||||
if not pre_action_output:
|
||||
logging.info("Powerful seal pre action check failed for " + str(scenario))
|
||||
return False
|
||||
else:
|
||||
logging.info(scenario + " post action checks passed")
|
||||
|
||||
elif scenario.endswith(".py"):
|
||||
action_output = runcommand.invoke("python3 " + scenario).strip()
|
||||
if pre_action_output:
|
||||
if pre_action_output == action_output:
|
||||
logging.info(scenario + " post action checks passed")
|
||||
else:
|
||||
logging.info(scenario + " post action response did not match pre check output")
|
||||
return False
|
||||
elif scenario != "":
|
||||
# invoke custom bash script
|
||||
action_output = runcommand.invoke(scenario).strip()
|
||||
if pre_action_output:
|
||||
if pre_action_output == action_output:
|
||||
logging.info(scenario + " post action checks passed")
|
||||
else:
|
||||
logging.info(scenario + " post action response did not match pre check output")
|
||||
return False
|
||||
|
||||
return action_output
|
||||
|
||||
|
||||
# Perform the post scenario actions to see if components recovered
|
||||
def post_actions(kubeconfig_path, scenario, failed_post_scenarios, pre_action_output):
|
||||
|
||||
for failed_scenario in failed_post_scenarios:
|
||||
post_action_output = run_post_action(kubeconfig_path, failed_scenario[0], failed_scenario[1])
|
||||
if post_action_output is not False:
|
||||
failed_post_scenarios.remove(failed_scenario)
|
||||
else:
|
||||
logging.info("Post action scenario " + str(failed_scenario) + "is still failing")
|
||||
|
||||
# check post actions
|
||||
if len(scenario) > 1:
|
||||
post_action_output = run_post_action(kubeconfig_path, scenario[1], pre_action_output)
|
||||
if post_action_output is False:
|
||||
failed_post_scenarios.append([scenario[1], pre_action_output])
|
||||
|
||||
return failed_post_scenarios
|
||||
|
||||
|
||||
def pod_scenarios(scenarios_list, config, failed_post_scenarios):
|
||||
try:
|
||||
# Loop to run the scenarios starts here
|
||||
for pod_scenario in scenarios_list:
|
||||
if len(pod_scenario) > 1:
|
||||
pre_action_output = run_post_action(kubeconfig_path, pod_scenario[1])
|
||||
else:
|
||||
pre_action_output = ""
|
||||
scenario_logs = runcommand.invoke(
|
||||
"powerfulseal autonomous --use-pod-delete-instead-"
|
||||
"of-ssh-kill --policy-file %s --kubeconfig %s "
|
||||
"--no-cloud --inventory-kubernetes --headless" % (pod_scenario[0], kubeconfig_path)
|
||||
)
|
||||
|
||||
# Display pod scenario logs/actions
|
||||
print(scenario_logs)
|
||||
|
||||
logging.info("Scenario: %s has been successfully injected!" % (pod_scenario[0]))
|
||||
logging.info("Waiting for the specified duration: %s" % (wait_duration))
|
||||
time.sleep(wait_duration)
|
||||
|
||||
failed_post_scenarios = post_actions(
|
||||
kubeconfig_path, pod_scenario, failed_post_scenarios, pre_action_output
|
||||
)
|
||||
publish_kraken_status(config, failed_post_scenarios)
|
||||
except Exception as e:
|
||||
logging.error("Failed to run scenario: %s. Encountered the following " "exception: %s" % (pod_scenario[0], e))
|
||||
return failed_post_scenarios
|
||||
|
||||
|
||||
def node_scenarios(scenarios_list, config):
|
||||
for node_scenario_config in scenarios_list:
|
||||
with open(node_scenario_config, "r") as f:
|
||||
node_scenario_config = yaml.full_load(f)
|
||||
for node_scenario in node_scenario_config["node_scenarios"]:
|
||||
node_scenario_object = get_node_scenario_object(node_scenario)
|
||||
if node_scenario["actions"]:
|
||||
for action in node_scenario["actions"]:
|
||||
inject_node_scenario(action, node_scenario, node_scenario_object)
|
||||
logging.info("Waiting for the specified duration: %s" % (wait_duration))
|
||||
time.sleep(wait_duration)
|
||||
cerberus_integration(config)
|
||||
logging.info("")
|
||||
|
||||
|
||||
def time_scenarios(scenarios_list, config):
|
||||
for time_scenario_config in scenarios_list:
|
||||
with open(time_scenario_config, "r") as f:
|
||||
scenario_config = yaml.full_load(f)
|
||||
for time_scenario in scenario_config["time_scenarios"]:
|
||||
object_type, object_names = time_actions.skew_time(time_scenario)
|
||||
not_reset = time_actions.check_date_time(object_type, object_names)
|
||||
if len(not_reset) > 0:
|
||||
logging.info("Object times were not reset")
|
||||
logging.info("Waiting for the specified duration: %s" % (wait_duration))
|
||||
time.sleep(wait_duration)
|
||||
publish_kraken_status(config, not_reset)
|
||||
|
||||
|
||||
def litmus_scenarios(scenarios_list, config, litmus_namespaces, litmus_uninstall):
|
||||
# Loop to run the scenarios starts here
|
||||
for l_scenario in scenarios_list:
|
||||
try:
|
||||
for item in l_scenario:
|
||||
runcommand.invoke("kubectl apply -f %s" % item)
|
||||
if "http" in item:
|
||||
f = requests.get(item)
|
||||
yaml_item = list(yaml.safe_load_all(f.content))[0]
|
||||
else:
|
||||
with open(item, "r") as f:
|
||||
logging.info("opened yaml" + str(item))
|
||||
yaml_item = list(yaml.safe_load_all(f))[0]
|
||||
|
||||
if yaml_item["kind"] == "ChaosEngine":
|
||||
engine_name = yaml_item["metadata"]["name"]
|
||||
namespace = yaml_item["metadata"]["namespace"]
|
||||
litmus_namespaces.append(namespace)
|
||||
experiment_names = yaml_item["spec"]["experiments"]
|
||||
for expr in experiment_names:
|
||||
expr_name = expr["name"]
|
||||
experiment_result = common_litmus.check_experiment(engine_name, expr_name, namespace)
|
||||
if experiment_result:
|
||||
logging.info("Scenario: %s has been successfully injected!" % item)
|
||||
else:
|
||||
logging.info("Scenario: %s was not successfully injected!" % item)
|
||||
if litmus_uninstall:
|
||||
for l_item in l_scenario:
|
||||
logging.info("item " + str(l_item))
|
||||
runcommand.invoke("kubectl delete -f %s" % l_item)
|
||||
if litmus_uninstall:
|
||||
for item in l_scenario:
|
||||
logging.info("item " + str(item))
|
||||
runcommand.invoke("kubectl delete -f %s" % item)
|
||||
logging.info("Waiting for the specified duration: %s" % wait_duration)
|
||||
time.sleep(wait_duration)
|
||||
cerberus_integration(config)
|
||||
except Exception as e:
|
||||
logging.error("Failed to run litmus scenario: %s. Encountered " "the following exception: %s" % (item, e))
|
||||
return litmus_namespaces
|
||||
import kraken.pod_scenarios.setup as pod_scenarios
|
||||
import kraken.node_actions.common_node_functions as nodeaction
|
||||
|
||||
|
||||
# Main function
|
||||
@@ -373,22 +86,27 @@ def main(cfg):
|
||||
if scenarios_list:
|
||||
# Inject pod chaos scenarios specified in the config
|
||||
if scenario_type == "pod_scenarios":
|
||||
failed_post_scenarios = pod_scenarios(scenarios_list, config, failed_post_scenarios)
|
||||
failed_post_scenarios = pod_scenarios.run(
|
||||
kubeconfig_path, scenarios_list, config, failed_post_scenarios, wait_duration,
|
||||
)
|
||||
failed_post_scenarios = pod_scenarios.run(
|
||||
kubeconfig_path, scenarios_list, config, failed_post_scenarios, wait_duration,
|
||||
)
|
||||
|
||||
# Inject node chaos scenarios specified in the config
|
||||
elif scenario_type == "node_scenarios":
|
||||
node_scenarios(scenarios_list, config)
|
||||
nodeaction.run(scenarios_list, config, wait_duration)
|
||||
|
||||
# Inject time skew chaos scenarios specified in the config
|
||||
elif scenario_type == "time_scenarios":
|
||||
time_scenarios(scenarios_list, config)
|
||||
time_actions.run(scenarios_list, config, wait_duration)
|
||||
elif scenario_type == "litmus_scenarios":
|
||||
if not litmus_installed:
|
||||
common_litmus.install_litmus(litmus_version)
|
||||
common_litmus.deploy_all_experiments(litmus_version)
|
||||
litmus_installed = True
|
||||
litmus_namespaces = litmus_scenarios(
|
||||
scenarios_list, config, litmus_namespaces, litmus_uninstall
|
||||
litmus_namespaces = common_litmus.run(
|
||||
scenarios_list, config, litmus_namespaces, litmus_uninstall, wait_duration,
|
||||
)
|
||||
|
||||
iteration += 1
|
||||
|
||||
Reference in New Issue
Block a user