diff --git a/config/config_arcaflow.yaml b/config/config_arcaflow.yaml new file mode 100644 index 00000000..d2cda6b5 --- /dev/null +++ b/config/config_arcaflow.yaml @@ -0,0 +1,39 @@ +kraken: + distribution: openshift # Distribution can be kubernetes or openshift + kubeconfig_path: ~/.kube/config # Path to kubeconfig + exit_on_failure: False # Exit when a post action scenario fails + publish_kraken_status: True # Can be accessed at http://0.0.0.0:8081 + signal_state: RUN # Will wait for the RUN signal when set to PAUSE before running the scenarios, refer docs/signal.md for more details + signal_address: 0.0.0.0 # Signal listening address + port: 8081 # Signal port + litmus_install: True # Installs specified version, set to False if it's already setup + litmus_version: v1.13.6 # Litmus version to install + litmus_uninstall: False # If you want to uninstall litmus if failure + litmus_uninstall_before_run: True # If you want to uninstall litmus before a new run starts + chaos_scenarios: # List of policies/chaos scenarios to load + - arcaflow_scenarios: + - scenarios/arcaflow/sysbench/input.yaml + - scenarios/arcaflow/kill-pod/input.yaml + +cerberus: + cerberus_enabled: False # Enable it when cerberus is previously installed + cerberus_url: # When cerberus_enabled is set to True, provide the url where cerberus publishes go/no-go signal + check_applicaton_routes: False # When enabled will look for application unavailability using the routes specified in the cerberus config and fails the run + +performance_monitoring: + deploy_dashboards: False # Install a mutable grafana and load the performance dashboards. Enable this only when running on OpenShift + repo: "https://github.com/cloud-bulldozer/performance-dashboards.git" + kube_burner_binary_url: "https://github.com/cloud-bulldozer/kube-burner/releases/download/v0.9.1/kube-burner-0.9.1-Linux-x86_64.tar.gz" + capture_metrics: False + config_path: config/kube_burner.yaml # Define the Elasticsearch url and index name in this config + metrics_profile_path: config/metrics-aggregated.yaml + prometheus_url: # The prometheus url/route is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes. + prometheus_bearer_token: # The bearer token is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes. This is needed to authenticate with prometheus. + uuid: # uuid for the run is generated by default if not set + enable_alerts: False # Runs the queries specified in the alert profile and displays the info or exits 1 when severity=error + alert_profile: config/alerts # Path to alert profile with the prometheus queries + +tunings: + wait_duration: 60 # Duration to wait between each chaos scenario + iterations: 1 # Number of times to execute the scenarios + daemon_mode: False # Iterations are set to infinity which means that the kraken will cause chaos forever diff --git a/kraken/arcaflow_plugin/__init__.py b/kraken/arcaflow_plugin/__init__.py new file mode 100644 index 00000000..d45a7a4f --- /dev/null +++ b/kraken/arcaflow_plugin/__init__.py @@ -0,0 +1 @@ +from .arcaflow_plugin import * \ No newline at end of file diff --git a/kraken/arcaflow_plugin/arcaflow_plugin.py b/kraken/arcaflow_plugin/arcaflow_plugin.py new file mode 100644 index 00000000..4a47e654 --- /dev/null +++ b/kraken/arcaflow_plugin/arcaflow_plugin.py @@ -0,0 +1,145 @@ +import arcaflow +import os +import yaml +import base64 +from pathlib import Path +from typing import List + + +def run(scenarios_list: List[str], kubeconfig_path: str): + for scenario in scenarios_list: + engineArgs = buildArgs(scenario) + runWorkflow(engineArgs, kubeconfig_path) + + +def runWorkflow(engineArgs: arcaflow.EngineArgs, kubeconfig_path: str): + setArcaKubeConfig(engineArgs, kubeconfig_path) + arcaflow.run(engineArgs) + + +def buildArgs(input: str) -> arcaflow.EngineArgs: + """sets the kubeconfig parsed by setArcaKubeConfig as an input to the arcaflow workflow""" + context = Path(input).parent + workflow = "{}/workflow.yaml".format(context) + config = "{}/config.yaml".format(context) + if os.path.exists(context) == False: + raise Exception( + "context folder for arcaflow workflow not found: {}".format( + context) + ) + if os.path.exists(input) == False: + raise Exception( + "input file for arcaflow workflow not found: {}".format(input)) + if os.path.exists(workflow) == False: + raise Exception( + "workflow file for arcaflow workflow not found: {}".format( + workflow) + ) + if os.path.exists(config) == False: + raise Exception( + "configuration file for arcaflow workflow not found: {}".format( + config) + ) + + engineArgs = arcaflow.EngineArgs() + engineArgs.context = context + engineArgs.config = config + engineArgs.input = input + return engineArgs + + +def setArcaKubeConfig(engineArgs: arcaflow.EngineArgs, kubeconfig_path: str): + kubeconfig_str = buildArcaKubeConfig(kubeconfig_path) + with open(engineArgs.input, "r") as stream: + input = yaml.safe_load(stream) + input["kubeconfig"] = kubeconfig_str + stream.close() + with open(engineArgs.input, "w") as stream: + yaml.safe_dump(input, stream) + + +def buildArcaKubeConfig(kubeconfig_path: str) -> str: + """ + Builds an arcaflow-compatible kubeconfig representation and returns it as a string. + In order to run arcaflow plugins in kubernetes/openshift the kubeconfig must contain client certificate/key + and server certificate base64 encoded within the kubeconfig file itself in *-data fields. That is not always the + case, infact kubeconfig may contain filesystem paths to those files, this function builds an arcaflow-compatible + kubeconfig file and returns it as a string that can be safely included in input.yaml + """ + if os.path.exists(kubeconfig_path) == False: + raise Exception("kubeconfig not found in {}".format(kubeconfig_path)) + + with open(kubeconfig_path, "r") as stream: + try: + kubeconfig = yaml.safe_load(stream) + except: + raise Exception( + "impossible to read kubeconfig file in: {}".format( + kubeconfig_path) + ) + + if "current-context" not in kubeconfig.keys(): + raise Exception( + "invalid kubeconfig file, impossible to determine current-context" + ) + userId = None + clusterId = None + userName = None + clusterName = None + currentContext = kubeconfig["current-context"] + for context in kubeconfig["contexts"]: + if context["name"] == currentContext: + userName = context["context"]["user"] + clusterName = context["context"]["cluster"] + if userName is None: + raise Exception( + "user not set for context {} in kubeconfig file".format(context) + ) + if clusterName is None: + raise Exception( + "cluster not set for context {} in kubeconfig file".format(context) + ) + + for index, user in enumerate(kubeconfig["users"]): + if user["name"] == userName: + userId = index + for index, cluster in enumerate(kubeconfig["clusters"]): + if cluster["name"] == clusterName: + clusterId = index + + if userId is None: + raise Exception( + "no user {} found in kubeconfig users".format(userName) + ) + if clusterId is None: + raise Exception( + "no cluster {} found in kubeconfig users".format(cluster) + ) + if "client-certificate" in kubeconfig["users"][userId]["user"]: + file = kubeconfig["users"][userId]["user"]["client-certificate"] + if (os.path.exists(file) == False): + raise Exception("user certificate not found {} ".format(file)) + with open(file, "rb") as file_stream: + encoded_file = base64.b64encode(file_stream.read()).decode("utf-8") + kubeconfig["users"][userId]["user"]["client-certificate-data"] = encoded_file + del kubeconfig["users"][userId]["user"]["client-certificate"] + + if "client-key" in kubeconfig["users"][userId]["user"]: + file = kubeconfig["users"][userId]["user"]["client-key"] + if (os.path.exists(file) == False): + raise Exception("user key not found: {} ".format(file)) + with open(file, "rb") as file_stream: + encoded_file = base64.b64encode(file_stream.read()).decode("utf-8") + kubeconfig["users"][userId]["user"]["client-key-data"] = encoded_file + del kubeconfig["users"][userId]["user"]["client-key"] + + if "certificate-authority" in kubeconfig["clusters"][clusterId]["cluster"]: + file = kubeconfig["clusters"][clusterId]["cluster"]["certificate-authority"] + if (os.path.exists(file) == False): + raise Exception("cluster certificate not found: {}".format(file)) + with open(file, "rb") as file_stream: + encoded_file = base64.b64encode(file_stream.read()).decode("utf-8") + kubeconfig["clusters"][clusterId]["cluster"]["certificate-authority-data"] = encoded_file + del kubeconfig["clusters"][clusterId]["cluster"]["certificate-authority"] + kubeconfig_str = yaml.dump(kubeconfig) + return kubeconfig_str diff --git a/requirements.txt b/requirements.txt index 878db138..c59df675 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,3 +32,4 @@ wheel service_identity git+https://github.com/vmware/vsphere-automation-sdk-python.git@v8.0.0.0 git+https://github.com/arcalot/arcaflow-plugin-kill-pod.git@main +arcaflow >= 0.3.0 \ No newline at end of file diff --git a/run_kraken.py b/run_kraken.py index adad22ff..7ff9f104 100644 --- a/run_kraken.py +++ b/run_kraken.py @@ -22,6 +22,7 @@ import kraken.zone_outage.actions as zone_outages import kraken.application_outage.actions as application_outage import kraken.pvc.pvc_scenario as pvc_scenario import kraken.network_chaos.actions as network_chaos +import kraken.arcaflow_plugin as arcaflow_plugin import server as server from kraken import plugins @@ -44,11 +45,11 @@ def main(cfg): config = yaml.full_load(f) global kubeconfig_path, wait_duration distribution = config["kraken"].get("distribution", "openshift") - kubeconfig_path = os.path.expanduser(config["kraken"].get("kubeconfig_path", "")) - chaos_scenarios = config["kraken"].get("chaos_scenarios", []) - publish_running_status = config["kraken"].get( - "publish_kraken_status", False + kubeconfig_path = os.path.expanduser( + config["kraken"].get("kubeconfig_path", "") ) + chaos_scenarios = config["kraken"].get("chaos_scenarios", []) + publish_running_status = config["kraken"].get("publish_kraken_status", False) port = config["kraken"].get("port") signal_address = config["kraken"].get("signal_address") run_signal = config["kraken"].get("signal_state", "RUN") @@ -65,15 +66,12 @@ def main(cfg): "deploy_dashboards", False ) dashboard_repo = config["performance_monitoring"].get( - "repo", - "https://github.com/cloud-bulldozer/performance-dashboards.git" - ) - capture_metrics = config["performance_monitoring"].get( - "capture_metrics", False + "repo", "https://github.com/cloud-bulldozer/performance-dashboards.git" ) + capture_metrics = config["performance_monitoring"].get("capture_metrics", False) kube_burner_url = config["performance_monitoring"].get( "kube_burner_binary_url", - KUBE_BURNER_URL.format(version=KUBE_BURNER_VERSION) + KUBE_BURNER_URL.format(version=KUBE_BURNER_VERSION), ) config_path = config["performance_monitoring"].get( "config_path", "config/kube_burner.yaml" @@ -81,25 +79,18 @@ def main(cfg): metrics_profile = config["performance_monitoring"].get( "metrics_profile_path", "config/metrics-aggregated.yaml" ) - prometheus_url = config["performance_monitoring"].get( - "prometheus_url", "" - ) + prometheus_url = config["performance_monitoring"].get("prometheus_url", "") prometheus_bearer_token = config["performance_monitoring"].get( "prometheus_bearer_token", "" ) run_uuid = config["performance_monitoring"].get("uuid", "") - enable_alerts = config["performance_monitoring"].get( - "enable_alerts", False - ) - alert_profile = config["performance_monitoring"].get( - "alert_profile", "" - ) + enable_alerts = config["performance_monitoring"].get("enable_alerts", False) + alert_profile = config["performance_monitoring"].get("alert_profile", "") # Initialize clients if not os.path.isfile(kubeconfig_path): logging.error( - "Cannot read the kubeconfig file at %s, please check" % - kubeconfig_path + "Cannot read the kubeconfig file at %s, please check" % kubeconfig_path ) sys.exit(1) logging.info("Initializing client to talk to the Kubernetes cluster") @@ -111,14 +102,10 @@ def main(cfg): # Set up kraken url to track signal if not 0 <= int(port) <= 65535: - logging.error( - "%s isn't a valid port number, please check" % (port) - ) + logging.error("%s isn't a valid port number, please check" % (port)) sys.exit(1) if not signal_address: - logging.error( - "Please set the signal address in the config" - ) + logging.error("Please set the signal address in the config") sys.exit(1) address = (signal_address, port) @@ -128,12 +115,11 @@ def main(cfg): server_address = address[0] port = address[1] logging.info( - "Publishing kraken status at http://%s:%s" % ( - server_address, - port - ) + "Publishing kraken status at http://%s:%s" % (server_address, port) + ) + logging.info( + "Publishing kraken status at http://%s:%s" % (server_address, port) ) - logging.info("Publishing kraken status at http://%s:%s" % (server_address, port)) server.start_server(address, run_signal) # Cluster info @@ -165,15 +151,13 @@ def main(cfg): # Set the number of iterations to loop to infinity if daemon mode is # enabled or else set it to the provided iterations count in the config if daemon_mode: - logging.info( - "Daemon mode enabled, kraken will cause chaos forever\n" - ) + logging.info("Daemon mode enabled, kraken will cause chaos forever\n") logging.info("Ignoring the iterations set") iterations = float("inf") else: logging.info( - "Daemon mode not enabled, will run through %s iterations\n" % - str(iterations) + "Daemon mode not enabled, will run through %s iterations\n" + % str(iterations) ) iterations = int(iterations) @@ -195,8 +179,7 @@ def main(cfg): while publish_running_status and run_signal == "PAUSE": logging.info( "Pausing Kraken run, waiting for %s seconds" - " and will re-poll signal" - % str(wait_duration) + " and will re-poll signal" % str(wait_duration) ) time.sleep(wait_duration) run_signal = server.get_status(address) @@ -214,52 +197,46 @@ def main(cfg): "kill-pods configuration instead." ) sys.exit(1) + elif scenario_type == "arcaflow_scenarios": + failed_post_scenarios = arcaflow_plugin.run( + scenarios_list, kubeconfig_path + ) + elif scenario_type == "plugin_scenarios": failed_post_scenarios = plugins.run( scenarios_list, kubeconfig_path, failed_post_scenarios, - wait_duration + wait_duration, ) elif scenario_type == "container_scenarios": logging.info("Running container scenarios") - failed_post_scenarios = \ - pod_scenarios.container_run( - kubeconfig_path, - scenarios_list, - config, - failed_post_scenarios, - wait_duration - ) + failed_post_scenarios = pod_scenarios.container_run( + kubeconfig_path, + scenarios_list, + config, + failed_post_scenarios, + wait_duration, + ) # Inject node chaos scenarios specified in the config elif scenario_type == "node_scenarios": logging.info("Running node scenarios") - nodeaction.run( - scenarios_list, - config, - wait_duration - ) + nodeaction.run(scenarios_list, config, wait_duration) # Inject managedcluster chaos scenarios specified in the config elif scenario_type == "managedcluster_scenarios": logging.info("Running managedcluster scenarios") managedcluster_scenarios.run( - scenarios_list, - config, - wait_duration - ) + scenarios_list, config, wait_duration + ) # Inject time skew chaos scenarios specified # in the config elif scenario_type == "time_scenarios": if distribution == "openshift": logging.info("Running time skew scenarios") - time_actions.run( - scenarios_list, - config, - wait_duration - ) + time_actions.run(scenarios_list, config, wait_duration) else: logging.error( "Litmus scenarios are currently " @@ -275,24 +252,19 @@ def main(cfg): if litmus_install: # Remove Litmus resources # before running the scenarios - common_litmus.delete_chaos( - litmus_namespace - ) + common_litmus.delete_chaos(litmus_namespace) common_litmus.delete_chaos_experiments( litmus_namespace ) if litmus_uninstall_before_run: common_litmus.uninstall_litmus( - litmus_version, - litmus_namespace + litmus_version, litmus_namespace ) common_litmus.install_litmus( - litmus_version, - litmus_namespace + litmus_version, litmus_namespace ) common_litmus.deploy_all_experiments( - litmus_version, - litmus_namespace + litmus_version, litmus_namespace ) litmus_installed = True common_litmus.run( @@ -311,11 +283,7 @@ def main(cfg): # Inject cluster shutdown scenarios elif scenario_type == "cluster_shut_down_scenarios": - shut_down.run( - scenarios_list, - config, - wait_duration - ) + shut_down.run(scenarios_list, config, wait_duration) # Inject namespace chaos scenarios elif scenario_type == "namespace_scenarios": @@ -325,25 +293,19 @@ def main(cfg): config, wait_duration, failed_post_scenarios, - kubeconfig_path + kubeconfig_path, ) # Inject zone failures elif scenario_type == "zone_outages": logging.info("Inject zone outages") - zone_outages.run( - scenarios_list, - config, - wait_duration - ) + zone_outages.run(scenarios_list, config, wait_duration) # Application outages elif scenario_type == "application_outages": logging.info("Injecting application outage") application_outage.run( - scenarios_list, - config, - wait_duration + scenarios_list, config, wait_duration ) # PVC scenarios @@ -354,11 +316,7 @@ def main(cfg): # Network scenarios elif scenario_type == "network_chaos": logging.info("Running Network Chaos") - network_chaos.run( - scenarios_list, - config, - wait_duration - ) + network_chaos.run(scenarios_list, config, wait_duration) iteration += 1 logging.info("") @@ -412,8 +370,7 @@ def main(cfg): run_dir = os.getcwd() + "/kraken.report" logging.info( "Successfully finished running Kraken. UUID for the run: " - "%s. Report generated at %s. Exiting" - % (run_uuid, run_dir) + "%s. Report generated at %s. Exiting" % (run_uuid, run_dir) ) else: logging.error("Cannot find a config at %s, please check" % (cfg)) @@ -436,7 +393,7 @@ if __name__ == "__main__": format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("kraken.report", mode="w"), - logging.StreamHandler() + logging.StreamHandler(), ], ) if options.cfg is None: