arcaflow integration (#384)

arcaflow library version

Co-authored-by: Tullio Sebastiani <tsebasti@redhat.com>
This commit is contained in:
Tullio Sebastiani
2023-03-08 12:01:03 +01:00
committed by GitHub
parent 0534e03c48
commit fee4f7d2bf
5 changed files with 236 additions and 93 deletions

View File

@@ -0,0 +1,39 @@
kraken:
distribution: openshift # Distribution can be kubernetes or openshift
kubeconfig_path: ~/.kube/config # Path to kubeconfig
exit_on_failure: False # Exit when a post action scenario fails
publish_kraken_status: True # Can be accessed at http://0.0.0.0:8081
signal_state: RUN # Will wait for the RUN signal when set to PAUSE before running the scenarios, refer docs/signal.md for more details
signal_address: 0.0.0.0 # Signal listening address
port: 8081 # Signal port
litmus_install: True # Installs specified version, set to False if it's already setup
litmus_version: v1.13.6 # Litmus version to install
litmus_uninstall: False # If you want to uninstall litmus if failure
litmus_uninstall_before_run: True # If you want to uninstall litmus before a new run starts
chaos_scenarios: # List of policies/chaos scenarios to load
- arcaflow_scenarios:
- scenarios/arcaflow/sysbench/input.yaml
- scenarios/arcaflow/kill-pod/input.yaml
cerberus:
cerberus_enabled: False # Enable it when cerberus is previously installed
cerberus_url: # When cerberus_enabled is set to True, provide the url where cerberus publishes go/no-go signal
check_applicaton_routes: False # When enabled will look for application unavailability using the routes specified in the cerberus config and fails the run
performance_monitoring:
deploy_dashboards: False # Install a mutable grafana and load the performance dashboards. Enable this only when running on OpenShift
repo: "https://github.com/cloud-bulldozer/performance-dashboards.git"
kube_burner_binary_url: "https://github.com/cloud-bulldozer/kube-burner/releases/download/v0.9.1/kube-burner-0.9.1-Linux-x86_64.tar.gz"
capture_metrics: False
config_path: config/kube_burner.yaml # Define the Elasticsearch url and index name in this config
metrics_profile_path: config/metrics-aggregated.yaml
prometheus_url: # The prometheus url/route is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes.
prometheus_bearer_token: # The bearer token is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes. This is needed to authenticate with prometheus.
uuid: # uuid for the run is generated by default if not set
enable_alerts: False # Runs the queries specified in the alert profile and displays the info or exits 1 when severity=error
alert_profile: config/alerts # Path to alert profile with the prometheus queries
tunings:
wait_duration: 60 # Duration to wait between each chaos scenario
iterations: 1 # Number of times to execute the scenarios
daemon_mode: False # Iterations are set to infinity which means that the kraken will cause chaos forever

View File

@@ -0,0 +1 @@
from .arcaflow_plugin import *

View File

@@ -0,0 +1,145 @@
import arcaflow
import os
import yaml
import base64
from pathlib import Path
from typing import List
def run(scenarios_list: List[str], kubeconfig_path: str):
for scenario in scenarios_list:
engineArgs = buildArgs(scenario)
runWorkflow(engineArgs, kubeconfig_path)
def runWorkflow(engineArgs: arcaflow.EngineArgs, kubeconfig_path: str):
setArcaKubeConfig(engineArgs, kubeconfig_path)
arcaflow.run(engineArgs)
def buildArgs(input: str) -> arcaflow.EngineArgs:
"""sets the kubeconfig parsed by setArcaKubeConfig as an input to the arcaflow workflow"""
context = Path(input).parent
workflow = "{}/workflow.yaml".format(context)
config = "{}/config.yaml".format(context)
if os.path.exists(context) == False:
raise Exception(
"context folder for arcaflow workflow not found: {}".format(
context)
)
if os.path.exists(input) == False:
raise Exception(
"input file for arcaflow workflow not found: {}".format(input))
if os.path.exists(workflow) == False:
raise Exception(
"workflow file for arcaflow workflow not found: {}".format(
workflow)
)
if os.path.exists(config) == False:
raise Exception(
"configuration file for arcaflow workflow not found: {}".format(
config)
)
engineArgs = arcaflow.EngineArgs()
engineArgs.context = context
engineArgs.config = config
engineArgs.input = input
return engineArgs
def setArcaKubeConfig(engineArgs: arcaflow.EngineArgs, kubeconfig_path: str):
kubeconfig_str = buildArcaKubeConfig(kubeconfig_path)
with open(engineArgs.input, "r") as stream:
input = yaml.safe_load(stream)
input["kubeconfig"] = kubeconfig_str
stream.close()
with open(engineArgs.input, "w") as stream:
yaml.safe_dump(input, stream)
def buildArcaKubeConfig(kubeconfig_path: str) -> str:
"""
Builds an arcaflow-compatible kubeconfig representation and returns it as a string.
In order to run arcaflow plugins in kubernetes/openshift the kubeconfig must contain client certificate/key
and server certificate base64 encoded within the kubeconfig file itself in *-data fields. That is not always the
case, infact kubeconfig may contain filesystem paths to those files, this function builds an arcaflow-compatible
kubeconfig file and returns it as a string that can be safely included in input.yaml
"""
if os.path.exists(kubeconfig_path) == False:
raise Exception("kubeconfig not found in {}".format(kubeconfig_path))
with open(kubeconfig_path, "r") as stream:
try:
kubeconfig = yaml.safe_load(stream)
except:
raise Exception(
"impossible to read kubeconfig file in: {}".format(
kubeconfig_path)
)
if "current-context" not in kubeconfig.keys():
raise Exception(
"invalid kubeconfig file, impossible to determine current-context"
)
userId = None
clusterId = None
userName = None
clusterName = None
currentContext = kubeconfig["current-context"]
for context in kubeconfig["contexts"]:
if context["name"] == currentContext:
userName = context["context"]["user"]
clusterName = context["context"]["cluster"]
if userName is None:
raise Exception(
"user not set for context {} in kubeconfig file".format(context)
)
if clusterName is None:
raise Exception(
"cluster not set for context {} in kubeconfig file".format(context)
)
for index, user in enumerate(kubeconfig["users"]):
if user["name"] == userName:
userId = index
for index, cluster in enumerate(kubeconfig["clusters"]):
if cluster["name"] == clusterName:
clusterId = index
if userId is None:
raise Exception(
"no user {} found in kubeconfig users".format(userName)
)
if clusterId is None:
raise Exception(
"no cluster {} found in kubeconfig users".format(cluster)
)
if "client-certificate" in kubeconfig["users"][userId]["user"]:
file = kubeconfig["users"][userId]["user"]["client-certificate"]
if (os.path.exists(file) == False):
raise Exception("user certificate not found {} ".format(file))
with open(file, "rb") as file_stream:
encoded_file = base64.b64encode(file_stream.read()).decode("utf-8")
kubeconfig["users"][userId]["user"]["client-certificate-data"] = encoded_file
del kubeconfig["users"][userId]["user"]["client-certificate"]
if "client-key" in kubeconfig["users"][userId]["user"]:
file = kubeconfig["users"][userId]["user"]["client-key"]
if (os.path.exists(file) == False):
raise Exception("user key not found: {} ".format(file))
with open(file, "rb") as file_stream:
encoded_file = base64.b64encode(file_stream.read()).decode("utf-8")
kubeconfig["users"][userId]["user"]["client-key-data"] = encoded_file
del kubeconfig["users"][userId]["user"]["client-key"]
if "certificate-authority" in kubeconfig["clusters"][clusterId]["cluster"]:
file = kubeconfig["clusters"][clusterId]["cluster"]["certificate-authority"]
if (os.path.exists(file) == False):
raise Exception("cluster certificate not found: {}".format(file))
with open(file, "rb") as file_stream:
encoded_file = base64.b64encode(file_stream.read()).decode("utf-8")
kubeconfig["clusters"][clusterId]["cluster"]["certificate-authority-data"] = encoded_file
del kubeconfig["clusters"][clusterId]["cluster"]["certificate-authority"]
kubeconfig_str = yaml.dump(kubeconfig)
return kubeconfig_str

View File

@@ -32,3 +32,4 @@ wheel
service_identity
git+https://github.com/vmware/vsphere-automation-sdk-python.git@v8.0.0.0
git+https://github.com/arcalot/arcaflow-plugin-kill-pod.git@main
arcaflow >= 0.3.0

View File

@@ -22,6 +22,7 @@ import kraken.zone_outage.actions as zone_outages
import kraken.application_outage.actions as application_outage
import kraken.pvc.pvc_scenario as pvc_scenario
import kraken.network_chaos.actions as network_chaos
import kraken.arcaflow_plugin as arcaflow_plugin
import server as server
from kraken import plugins
@@ -44,11 +45,11 @@ def main(cfg):
config = yaml.full_load(f)
global kubeconfig_path, wait_duration
distribution = config["kraken"].get("distribution", "openshift")
kubeconfig_path = os.path.expanduser(config["kraken"].get("kubeconfig_path", ""))
chaos_scenarios = config["kraken"].get("chaos_scenarios", [])
publish_running_status = config["kraken"].get(
"publish_kraken_status", False
kubeconfig_path = os.path.expanduser(
config["kraken"].get("kubeconfig_path", "")
)
chaos_scenarios = config["kraken"].get("chaos_scenarios", [])
publish_running_status = config["kraken"].get("publish_kraken_status", False)
port = config["kraken"].get("port")
signal_address = config["kraken"].get("signal_address")
run_signal = config["kraken"].get("signal_state", "RUN")
@@ -65,15 +66,12 @@ def main(cfg):
"deploy_dashboards", False
)
dashboard_repo = config["performance_monitoring"].get(
"repo",
"https://github.com/cloud-bulldozer/performance-dashboards.git"
)
capture_metrics = config["performance_monitoring"].get(
"capture_metrics", False
"repo", "https://github.com/cloud-bulldozer/performance-dashboards.git"
)
capture_metrics = config["performance_monitoring"].get("capture_metrics", False)
kube_burner_url = config["performance_monitoring"].get(
"kube_burner_binary_url",
KUBE_BURNER_URL.format(version=KUBE_BURNER_VERSION)
KUBE_BURNER_URL.format(version=KUBE_BURNER_VERSION),
)
config_path = config["performance_monitoring"].get(
"config_path", "config/kube_burner.yaml"
@@ -81,25 +79,18 @@ def main(cfg):
metrics_profile = config["performance_monitoring"].get(
"metrics_profile_path", "config/metrics-aggregated.yaml"
)
prometheus_url = config["performance_monitoring"].get(
"prometheus_url", ""
)
prometheus_url = config["performance_monitoring"].get("prometheus_url", "")
prometheus_bearer_token = config["performance_monitoring"].get(
"prometheus_bearer_token", ""
)
run_uuid = config["performance_monitoring"].get("uuid", "")
enable_alerts = config["performance_monitoring"].get(
"enable_alerts", False
)
alert_profile = config["performance_monitoring"].get(
"alert_profile", ""
)
enable_alerts = config["performance_monitoring"].get("enable_alerts", False)
alert_profile = config["performance_monitoring"].get("alert_profile", "")
# Initialize clients
if not os.path.isfile(kubeconfig_path):
logging.error(
"Cannot read the kubeconfig file at %s, please check" %
kubeconfig_path
"Cannot read the kubeconfig file at %s, please check" % kubeconfig_path
)
sys.exit(1)
logging.info("Initializing client to talk to the Kubernetes cluster")
@@ -111,14 +102,10 @@ def main(cfg):
# Set up kraken url to track signal
if not 0 <= int(port) <= 65535:
logging.error(
"%s isn't a valid port number, please check" % (port)
)
logging.error("%s isn't a valid port number, please check" % (port))
sys.exit(1)
if not signal_address:
logging.error(
"Please set the signal address in the config"
)
logging.error("Please set the signal address in the config")
sys.exit(1)
address = (signal_address, port)
@@ -128,12 +115,11 @@ def main(cfg):
server_address = address[0]
port = address[1]
logging.info(
"Publishing kraken status at http://%s:%s" % (
server_address,
port
)
"Publishing kraken status at http://%s:%s" % (server_address, port)
)
logging.info(
"Publishing kraken status at http://%s:%s" % (server_address, port)
)
logging.info("Publishing kraken status at http://%s:%s" % (server_address, port))
server.start_server(address, run_signal)
# Cluster info
@@ -165,15 +151,13 @@ def main(cfg):
# Set the number of iterations to loop to infinity if daemon mode is
# enabled or else set it to the provided iterations count in the config
if daemon_mode:
logging.info(
"Daemon mode enabled, kraken will cause chaos forever\n"
)
logging.info("Daemon mode enabled, kraken will cause chaos forever\n")
logging.info("Ignoring the iterations set")
iterations = float("inf")
else:
logging.info(
"Daemon mode not enabled, will run through %s iterations\n" %
str(iterations)
"Daemon mode not enabled, will run through %s iterations\n"
% str(iterations)
)
iterations = int(iterations)
@@ -195,8 +179,7 @@ def main(cfg):
while publish_running_status and run_signal == "PAUSE":
logging.info(
"Pausing Kraken run, waiting for %s seconds"
" and will re-poll signal"
% str(wait_duration)
" and will re-poll signal" % str(wait_duration)
)
time.sleep(wait_duration)
run_signal = server.get_status(address)
@@ -214,52 +197,46 @@ def main(cfg):
"kill-pods configuration instead."
)
sys.exit(1)
elif scenario_type == "arcaflow_scenarios":
failed_post_scenarios = arcaflow_plugin.run(
scenarios_list, kubeconfig_path
)
elif scenario_type == "plugin_scenarios":
failed_post_scenarios = plugins.run(
scenarios_list,
kubeconfig_path,
failed_post_scenarios,
wait_duration
wait_duration,
)
elif scenario_type == "container_scenarios":
logging.info("Running container scenarios")
failed_post_scenarios = \
pod_scenarios.container_run(
kubeconfig_path,
scenarios_list,
config,
failed_post_scenarios,
wait_duration
)
failed_post_scenarios = pod_scenarios.container_run(
kubeconfig_path,
scenarios_list,
config,
failed_post_scenarios,
wait_duration,
)
# Inject node chaos scenarios specified in the config
elif scenario_type == "node_scenarios":
logging.info("Running node scenarios")
nodeaction.run(
scenarios_list,
config,
wait_duration
)
nodeaction.run(scenarios_list, config, wait_duration)
# Inject managedcluster chaos scenarios specified in the config
elif scenario_type == "managedcluster_scenarios":
logging.info("Running managedcluster scenarios")
managedcluster_scenarios.run(
scenarios_list,
config,
wait_duration
)
scenarios_list, config, wait_duration
)
# Inject time skew chaos scenarios specified
# in the config
elif scenario_type == "time_scenarios":
if distribution == "openshift":
logging.info("Running time skew scenarios")
time_actions.run(
scenarios_list,
config,
wait_duration
)
time_actions.run(scenarios_list, config, wait_duration)
else:
logging.error(
"Litmus scenarios are currently "
@@ -275,24 +252,19 @@ def main(cfg):
if litmus_install:
# Remove Litmus resources
# before running the scenarios
common_litmus.delete_chaos(
litmus_namespace
)
common_litmus.delete_chaos(litmus_namespace)
common_litmus.delete_chaos_experiments(
litmus_namespace
)
if litmus_uninstall_before_run:
common_litmus.uninstall_litmus(
litmus_version,
litmus_namespace
litmus_version, litmus_namespace
)
common_litmus.install_litmus(
litmus_version,
litmus_namespace
litmus_version, litmus_namespace
)
common_litmus.deploy_all_experiments(
litmus_version,
litmus_namespace
litmus_version, litmus_namespace
)
litmus_installed = True
common_litmus.run(
@@ -311,11 +283,7 @@ def main(cfg):
# Inject cluster shutdown scenarios
elif scenario_type == "cluster_shut_down_scenarios":
shut_down.run(
scenarios_list,
config,
wait_duration
)
shut_down.run(scenarios_list, config, wait_duration)
# Inject namespace chaos scenarios
elif scenario_type == "namespace_scenarios":
@@ -325,25 +293,19 @@ def main(cfg):
config,
wait_duration,
failed_post_scenarios,
kubeconfig_path
kubeconfig_path,
)
# Inject zone failures
elif scenario_type == "zone_outages":
logging.info("Inject zone outages")
zone_outages.run(
scenarios_list,
config,
wait_duration
)
zone_outages.run(scenarios_list, config, wait_duration)
# Application outages
elif scenario_type == "application_outages":
logging.info("Injecting application outage")
application_outage.run(
scenarios_list,
config,
wait_duration
scenarios_list, config, wait_duration
)
# PVC scenarios
@@ -354,11 +316,7 @@ def main(cfg):
# Network scenarios
elif scenario_type == "network_chaos":
logging.info("Running Network Chaos")
network_chaos.run(
scenarios_list,
config,
wait_duration
)
network_chaos.run(scenarios_list, config, wait_duration)
iteration += 1
logging.info("")
@@ -412,8 +370,7 @@ def main(cfg):
run_dir = os.getcwd() + "/kraken.report"
logging.info(
"Successfully finished running Kraken. UUID for the run: "
"%s. Report generated at %s. Exiting"
% (run_uuid, run_dir)
"%s. Report generated at %s. Exiting" % (run_uuid, run_dir)
)
else:
logging.error("Cannot find a config at %s, please check" % (cfg))
@@ -436,7 +393,7 @@ if __name__ == "__main__":
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("kraken.report", mode="w"),
logging.StreamHandler()
logging.StreamHandler(),
],
)
if options.cfg is None: