Files
krkn/run_kraken.py
Paige Patton bb636cd3a9 Custom weight to resiliency (#1173)
* feat(resiliency): implement comprehensive resiliency scoring system

- Added resiliency scoring engine
- Implemented scenario-wise scoring with telemetry
- Added configurable SLOs and detailed reporting

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* fix: check prometheus url after openshift prometheus check

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* custom weight

Signed-off-by: Paige Patton <prubenda@redhat.com>

---------

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
Co-authored-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
2026-03-19 13:14:08 -04:00

838 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
import datetime
import json
import os
import sys
import yaml
import logging
import optparse
from colorlog import ColoredFormatter
import pyfiglet
import uuid
import time
import queue
import threading
from typing import Optional, Dict
from krkn import cerberus
from krkn_lib.elastic.krkn_elastic import KrknElastic
from krkn_lib.models.elastic import ElasticChaosRunTelemetry
from krkn_lib.models.krkn import ChaosRunOutput, ChaosRunAlertSummary
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
import krkn.prometheus as prometheus_plugin
import server as server
from krkn.resiliency.resiliency import (
Resiliency
)
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.ocp import KrknOpenshift
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.models.telemetry import ChaosRunTelemetry
from krkn_lib.models.k8s import ResiliencyReport
from krkn_lib.utils import SafeLogger
from krkn_lib.utils.functions import get_yaml_item_value, get_junit_test_case
from krkn.utils import TeeLogHandler, ErrorCollectionHandler
from krkn.utils.HealthChecker import HealthChecker
from krkn.utils.VirtChecker import VirtChecker
from krkn.scenario_plugins.scenario_plugin_factory import (
ScenarioPluginFactory,
ScenarioPluginNotFound,
)
from krkn.rollback.config import RollbackConfig
from krkn.rollback.command import (
list_rollback as list_rollback_command,
execute_rollback as execute_rollback_command,
)
# removes TripleDES warning
import warnings
warnings.filterwarnings(action='ignore', module='.*paramiko.*')
report_file = ""
# Main function
def main(options, command: Optional[str]) -> int:
# Start kraken
print(pyfiglet.figlet_format("kraken"))
logging.info("Starting kraken")
cfg = options.cfg
# Parse and read the config
if os.path.isfile(cfg):
with open(cfg, "r") as f:
config = yaml.full_load(f)
global kubeconfig_path, wait_duration, kraken_config
kubeconfig_path = os.path.expanduser(
get_yaml_item_value(config["kraken"], "kubeconfig_path", "")
)
kraken_config = cfg
chaos_scenarios = get_yaml_item_value(config["kraken"], "chaos_scenarios", [])
publish_running_status = get_yaml_item_value(
config["kraken"], "publish_kraken_status", False
)
port = get_yaml_item_value(config["kraken"], "port", 8081)
RollbackConfig.register(
auto=get_yaml_item_value(
config["kraken"],
"auto_rollback",
False
),
versions_directory=get_yaml_item_value(
config["kraken"],
"rollback_versions_directory",
"/tmp/kraken-rollback"
),
)
signal_address = get_yaml_item_value(
config["kraken"], "signal_address", "0.0.0.0"
)
run_signal = get_yaml_item_value(config["kraken"], "signal_state", "RUN")
resiliency_config = get_yaml_item_value(config,"resiliency",{})
# Determine execution mode (standalone, controller, or disabled)
run_mode = get_yaml_item_value(resiliency_config, "resiliency_run_mode", "standalone")
valid_run_modes = {"standalone", "detailed", "disabled"}
if run_mode not in valid_run_modes:
logging.warning("Unknown resiliency_run_mode '%s'. Defaulting to 'standalone'", run_mode)
run_mode = "standalone"
wait_duration = get_yaml_item_value(config["tunings"], "wait_duration", 60)
iterations = get_yaml_item_value(config["tunings"], "iterations", 1)
daemon_mode = get_yaml_item_value(config["tunings"], "daemon_mode", False)
prometheus_url = config["performance_monitoring"].get("prometheus_url")
prometheus_bearer_token = config["performance_monitoring"].get("prometheus_bearer_token")
run_uuid = config["performance_monitoring"].get("uuid")
enable_alerts = get_yaml_item_value(
config["performance_monitoring"], "enable_alerts", False
)
enable_metrics = get_yaml_item_value(
config["performance_monitoring"], "enable_metrics", False
)
# Default placeholder; will be overridden if a Prometheus URL is available
prometheus = None
# elastic search
enable_elastic = get_yaml_item_value(config["elastic"], "enable_elastic", False)
elastic_run_tag = get_yaml_item_value(config["elastic"], "run_tag", "")
elastic_url = get_yaml_item_value(config["elastic"], "elastic_url", "")
elastic_verify_certs = get_yaml_item_value(
config["elastic"], "verify_certs", False
)
elastic_port = get_yaml_item_value(config["elastic"], "elastic_port", 32766)
elastic_username = get_yaml_item_value(config["elastic"], "username", "")
elastic_password = get_yaml_item_value(config["elastic"], "password", "")
elastic_metrics_index = get_yaml_item_value(
config["elastic"], "metrics_index", "krkn-metrics"
)
elastic_alerts_index = get_yaml_item_value(
config["elastic"], "alerts_index", "krkn-alerts"
)
elastic_telemetry_index = get_yaml_item_value(
config["elastic"], "telemetry_index", "krkn-telemetry"
)
alert_profile = config["performance_monitoring"].get("alert_profile")
metrics_profile = config["performance_monitoring"].get("metrics_profile")
check_critical_alerts = get_yaml_item_value(
config["performance_monitoring"], "check_critical_alerts", False
)
telemetry_api_url = config["telemetry"].get("api_url")
health_check_config = get_yaml_item_value(config, "health_checks",{})
kubevirt_check_config = get_yaml_item_value(config, "kubevirt_checks", {})
# Initialize clients
if not os.path.isfile(kubeconfig_path) and not os.path.isfile(
"/var/run/secrets/kubernetes.io/serviceaccount/token"
):
logging.error(
"Cannot read the kubeconfig file at %s, please check" % kubeconfig_path
)
return -1
logging.info("Initializing client to talk to the Kubernetes cluster")
# Set Cerberus url if enabled
cerberus.set_url(config)
# Generate uuid for the run
if run_uuid:
logging.info(
"Using the uuid defined by the user for the run: %s" % run_uuid
)
else:
run_uuid = str(uuid.uuid4())
logging.info("Generated a uuid for the run: %s" % run_uuid)
# request_id for telemetry is generated once here and used everywhere
telemetry_request_id = f"{int(time.time())}-{run_uuid}"
if config["telemetry"].get("run_tag"):
telemetry_request_id = (
f"{telemetry_request_id}-{config['telemetry']['run_tag']}"
)
telemetry_log_file = (
f'{config["telemetry"]["archive_path"]}/{telemetry_request_id}.log'
)
safe_logger = SafeLogger(filename=telemetry_log_file)
try:
kubeconfig_path
os.environ["KUBECONFIG"] = str(kubeconfig_path)
# krkn-lib-kubernetes init
kubecli = KrknKubernetes(kubeconfig_path=kubeconfig_path)
ocpcli = KrknOpenshift(kubeconfig_path=kubeconfig_path)
except:
kubecli.initialize_clients(None)
distribution = "kubernetes"
if ocpcli.is_openshift():
distribution = "openshift"
logging.info("Detected distribution %s" % (distribution))
# find node kraken might be running on
kubecli.find_kraken_node()
# Set up kraken url to track signal
if not 0 <= int(port) <= 65535:
logging.error("%s isn't a valid port number, please check" % (port))
return -1
if not signal_address:
logging.error("Please set the signal address in the config")
return -1
address = (signal_address, port)
# If publish_running_status is False this should keep us going
# in our loop below
if publish_running_status:
server_address = address[0]
port = address[1]
logging.info(
"Publishing kraken status at http://%s:%s" % (server_address, port)
)
server.start_server(address, run_signal)
# Cluster info
logging.info("Fetching cluster info")
cv = ""
if distribution == "openshift":
cv = ocpcli.get_clusterversion_string()
if not prometheus_url:
try:
connection_data = ocpcli.get_prometheus_api_connection_data()
if connection_data:
prometheus_url = connection_data.endpoint
prometheus_bearer_token = connection_data.token
else:
# If can't make a connection, set alerts to false
enable_alerts = False
check_critical_alerts = False
except Exception:
logging.error(
"invalid distribution selected, running openshift scenarios against kubernetes cluster."
"Please set 'kubernetes' in config.yaml krkn.platform and try again"
)
return -1
if cv != "":
logging.info(cv)
else:
logging.info("Cluster version CRD not detected, skipping")
# Final check: ensure Prometheus URL is available; disable resiliency if not
if (not prometheus_url or prometheus_url.strip() == "") and run_mode != "disabled":
logging.warning("Prometheus URL not provided; disabling resiliency score features.")
run_mode = "disabled"
# KrknTelemetry init
telemetry_k8s = KrknTelemetryKubernetes(
safe_logger, kubecli, config["telemetry"]
)
telemetry_ocp = KrknTelemetryOpenshift(
safe_logger, ocpcli, telemetry_request_id, config["telemetry"]
)
if enable_elastic:
logging.info(f"Elastic collection enabled at: {elastic_url}:{elastic_port}")
elastic_search = KrknElastic(
safe_logger,
elastic_url,
elastic_port,
elastic_verify_certs,
elastic_username,
elastic_password,
)
else:
elastic_search = None
summary = ChaosRunAlertSummary()
if enable_metrics or enable_alerts or check_critical_alerts or run_mode != "disabled":
prometheus = KrknPrometheus(prometheus_url, prometheus_bearer_token)
# Quick connectivity probe for Prometheus disable resiliency if unreachable
try:
prometheus.process_prom_query_in_range(
"up", datetime.datetime.utcnow() - datetime.timedelta(seconds=60), datetime.datetime.utcnow(), granularity=60
)
except Exception as prom_exc:
logging.error("Prometheus connectivity test failed: %s. Disabling resiliency features as Prometheus is required for SLO evaluation.", prom_exc)
run_mode = "disabled"
resiliency_alerts = get_yaml_item_value(resiliency_config, "resiliency_file", get_yaml_item_value(config['performance_monitoring'],"alert_profile", "config/alerts.yaml"))
resiliency_obj = Resiliency(resiliency_alerts) if run_mode != "disabled" else None # Initialize resiliency orchestrator
logging.info("Server URL: %s" % kubecli.get_host())
if command == "list-rollback":
sys.exit(
list_rollback_command(
options.run_uuid, options.scenario_type
)
)
elif command == "execute-rollback":
sys.exit(
execute_rollback_command(
telemetry_ocp, options.run_uuid, options.scenario_type
)
)
# Initialize the start iteration to 0
iteration = 0
# Set the number of iterations to loop to infinity if daemon mode is
# enabled or else set it to the provided iterations count in the config
if daemon_mode:
logging.info("Daemon mode enabled, kraken will cause chaos forever\n")
logging.info("Ignoring the iterations set")
iterations = float("inf")
else:
logging.info(
"Daemon mode not enabled, will run through %s iterations\n"
% str(iterations)
)
iterations = int(iterations)
failed_post_scenarios = []
# Capture the start time
start_time = int(time.time())
post_critical_alerts = 0
chaos_output = ChaosRunOutput()
chaos_telemetry = ChaosRunTelemetry()
chaos_telemetry.run_uuid = run_uuid
chaos_telemetry.tag = elastic_run_tag
scenario_plugin_factory = ScenarioPluginFactory()
classes_and_types: dict[str, list[str]] = {}
for loaded in scenario_plugin_factory.loaded_plugins.keys():
if (
scenario_plugin_factory.loaded_plugins[loaded].__name__
not in classes_and_types.keys()
):
classes_and_types[
scenario_plugin_factory.loaded_plugins[loaded].__name__
] = []
classes_and_types[
scenario_plugin_factory.loaded_plugins[loaded].__name__
].append(loaded)
logging.info(
"📣 `ScenarioPluginFactory`: types from config.yaml mapped to respective classes for execution:"
)
for class_loaded in classes_and_types.keys():
if len(classes_and_types[class_loaded]) <= 1:
logging.info(
f" ✅ type: {classes_and_types[class_loaded][0]} ➡️ `{class_loaded}` "
)
else:
logging.info(
f" ✅ types: [{', '.join(classes_and_types[class_loaded])}] ➡️ `{class_loaded}` "
)
logging.info("\n")
if len(scenario_plugin_factory.failed_plugins) > 0:
logging.info("Failed to load Scenario Plugins:\n")
for failed in scenario_plugin_factory.failed_plugins:
module_name, class_name, error = failed
logging.error(f"⛔ Class: {class_name} Module: {module_name}")
logging.error(f"⚠️ {error}\n")
health_check_telemetry_queue = queue.Queue()
health_checker = HealthChecker(iterations)
health_check_worker = threading.Thread(target=health_checker.run_health_check,
args=(health_check_config, health_check_telemetry_queue))
health_check_worker.start()
kubevirt_check_telemetry_queue = queue.SimpleQueue()
kubevirt_checker = VirtChecker(kubevirt_check_config, iterations=iterations, krkn_lib=kubecli)
kubevirt_checker.batch_list(kubevirt_check_telemetry_queue)
# Loop to run the chaos starts here
while int(iteration) < iterations and run_signal != "STOP":
# Inject chaos scenarios specified in the config
logging.info("Executing scenarios for iteration " + str(iteration))
if chaos_scenarios:
for scenario in chaos_scenarios:
if publish_running_status:
run_signal = server.get_status(address)
if run_signal == "PAUSE":
while publish_running_status and run_signal == "PAUSE":
logging.info(
"Pausing Kraken run, waiting for %s seconds"
" and will re-poll signal" % str(wait_duration)
)
time.sleep(wait_duration)
run_signal = server.get_status(address)
if run_signal == "STOP":
logging.info("Received STOP signal; ending Kraken run")
break
scenario_type = list(scenario.keys())[0]
scenarios_list = scenario[scenario_type]
if scenarios_list:
try:
scenario_plugin = scenario_plugin_factory.create_plugin(
scenario_type
)
except ScenarioPluginNotFound:
logging.error(
f"impossible to find scenario {scenario_type}, plugin not found. Exiting"
)
sys.exit(-1)
batch_window_start_dt = datetime.datetime.utcnow()
failed_scenarios_current, scenario_telemetries = (
scenario_plugin.run_scenarios(
run_uuid, scenarios_list, config, telemetry_ocp
)
)
failed_post_scenarios.extend(failed_scenarios_current)
chaos_telemetry.scenarios.extend(scenario_telemetries)
batch_window_end_dt = datetime.datetime.utcnow()
if resiliency_obj:
resiliency_obj.add_scenario_reports(
scenario_telemetries=scenario_telemetries,
prom_cli=prometheus,
scenario_type=scenario_type,
batch_start_dt=batch_window_start_dt,
batch_end_dt=batch_window_end_dt,
)
post_critical_alerts = 0
if check_critical_alerts:
prometheus_plugin.critical_alerts(
prometheus,
summary,
elastic_search,
run_uuid,
scenario_type,
start_time,
datetime.datetime.now(),
elastic_alerts_index
)
chaos_output.critical_alerts = summary
post_critical_alerts = len(summary.post_chaos_alerts)
if post_critical_alerts > 0:
logging.error(
"Post chaos critical alerts firing please check, exiting"
)
break
iteration += 1
health_checker.current_iterations += 1
kubevirt_checker.increment_iterations()
# telemetry
# in order to print decoded telemetry data even if telemetry collection
# is disabled, it's necessary to serialize the ChaosRunTelemetry object
# to json, and recreate a new object from it.
end_time = int(time.time())
health_check_worker.join()
try:
chaos_telemetry.health_checks = health_check_telemetry_queue.get_nowait()
except queue.Empty:
chaos_telemetry.health_checks = None
kubevirt_checker.thread_join()
kubevirt_check_telem = []
while not kubevirt_check_telemetry_queue.empty():
kubevirt_check_telem.extend(kubevirt_check_telemetry_queue.get_nowait())
chaos_telemetry.virt_checks = kubevirt_check_telem
post_kubevirt_check = kubevirt_checker.gather_post_virt_checks(kubevirt_check_telem)
chaos_telemetry.post_virt_checks = post_kubevirt_check
# if platform is openshift will be collected
# Cloud platform and network plugins metadata
# through OCP specific APIs
if distribution == "openshift":
logging.info(
"collecting OCP cluster metadata, this may take few minutes...."
)
telemetry_ocp.collect_cluster_metadata(chaos_telemetry)
else:
logging.info("collecting Kubernetes cluster metadata....")
telemetry_k8s.collect_cluster_metadata(chaos_telemetry)
# Collect error logs from handler
error_logs = error_collection_handler.get_error_logs()
if error_logs:
logging.info(f"Collected {len(error_logs)} error logs for telemetry")
chaos_telemetry.error_logs = error_logs
else:
logging.info("No error logs collected during chaos run")
chaos_telemetry.error_logs = []
if resiliency_obj:
try:
resiliency_obj.attach_compact_to_telemetry(chaos_telemetry)
except Exception as exc:
logging.error("Failed to embed per-scenario resiliency in telemetry: %s", exc)
if resiliency_obj:
try:
resiliency_obj.finalize_and_save(
prom_cli=prometheus,
total_start_time=datetime.datetime.fromtimestamp(start_time),
total_end_time=datetime.datetime.fromtimestamp(end_time),
run_mode=run_mode,
)
except Exception as e:
logging.error("Failed to finalize resiliency scoring: %s", e)
telemetry_json = chaos_telemetry.to_json()
decoded_chaos_run_telemetry = ChaosRunTelemetry(json.loads(telemetry_json))
if resiliency_obj and hasattr(resiliency_obj, "summary") and resiliency_obj.summary is not None:
summary_dict = resiliency_obj.get_summary()
decoded_chaos_run_telemetry.overall_resiliency_report = ResiliencyReport(
json_object=summary_dict,
resiliency_score=summary_dict.get("resiliency_score", 0),
passed_slos=summary_dict.get("passed_slos", 0),
total_slos=summary_dict.get("total_slos", 0)
)
chaos_output.telemetry = decoded_chaos_run_telemetry
logging.info(f"Chaos data:\n{chaos_output.to_json()}")
if enable_elastic:
elastic_telemetry = ElasticChaosRunTelemetry(
chaos_run_telemetry=decoded_chaos_run_telemetry
)
result = elastic_search.push_telemetry(
decoded_chaos_run_telemetry, elastic_telemetry_index
)
if result == -1:
safe_logger.error(
f"failed to save telemetry on elastic search: {chaos_output.to_json()}"
)
if config["telemetry"]["enabled"]:
logging.info(
f"telemetry data will be stored on s3 bucket folder: {telemetry_api_url}/files/"
f'{(config["telemetry"]["telemetry_group"] if config["telemetry"]["telemetry_group"] else "default")}/'
f"{telemetry_request_id}"
)
logging.info(f"telemetry upload log: {safe_logger.log_file_name}")
try:
telemetry_k8s.send_telemetry(
config["telemetry"], telemetry_request_id, chaos_telemetry
)
telemetry_k8s.put_critical_alerts(
telemetry_request_id, config["telemetry"], summary
)
# prometheus data collection is available only on Openshift
if config["telemetry"]["prometheus_backup"]:
prometheus_archive_files = ""
if distribution == "openshift":
prometheus_archive_files = (
telemetry_ocp.get_ocp_prometheus_data(
config["telemetry"], telemetry_request_id
)
)
else:
if (
config["telemetry"]["prometheus_namespace"]
and config["telemetry"]["prometheus_pod_name"]
and config["telemetry"]["prometheus_container_name"]
):
try:
prometheus_archive_files = (
telemetry_k8s.get_prometheus_pod_data(
config["telemetry"],
telemetry_request_id,
config["telemetry"]["prometheus_pod_name"],
config["telemetry"][
"prometheus_container_name"
],
config["telemetry"]["prometheus_namespace"],
)
)
except Exception as e:
logging.error(
f"failed to get prometheus backup with exception {str(e)}"
)
else:
logging.warning(
"impossible to backup prometheus,"
"check if config contains telemetry.prometheus_namespace, "
"telemetry.prometheus_pod_name and "
"telemetry.prometheus_container_name"
)
if prometheus_archive_files:
safe_logger.info("starting prometheus archive upload:")
telemetry_k8s.put_prometheus_data(
config["telemetry"],
prometheus_archive_files,
telemetry_request_id,
)
except Exception as e:
logging.error(f"failed to send telemetry data: {str(e)}")
else:
logging.info("telemetry collection disabled, skipping.")
# Check for the alerts specified
if enable_alerts:
logging.info("Alerts checking is enabled")
if alert_profile:
prometheus_plugin.alerts(
prometheus,
elastic_search,
run_uuid,
start_time,
end_time,
alert_profile,
elastic_alerts_index
)
else:
logging.error("Alert profile is not defined")
return -1
# sys.exit(1)
if enable_metrics:
logging.info(f'Capturing metrics using file {metrics_profile}')
prometheus_plugin.metrics(
prometheus,
elastic_search,
run_uuid,
start_time,
end_time,
metrics_profile,
elastic_metrics_index,
telemetry_json
)
# want to exit with 1 first to show failure of scenario
# even if alerts failing
if failed_post_scenarios:
logging.error(
"Post scenarios are still failing at the end of all iterations"
)
# sys.exit(1)
return 1
if post_critical_alerts > 0:
logging.error("Critical alerts are firing, please check; exiting")
# sys.exit(2)
return 2
if health_checker.ret_value != 0:
logging.error("Health check failed for the applications, Please check; exiting")
return health_checker.ret_value
if kubevirt_checker.ret_value != 0:
logging.error("Kubevirt check still had failed VMIs at end of run, Please check; exiting")
return kubevirt_checker.ret_value
logging.info(
"Successfully finished running Kraken. UUID for the run: "
"%s. Report generated at %s. Exiting" % (run_uuid, report_file)
)
else:
logging.error("Cannot find a config at %s, please check" % (cfg))
# sys.exit(1)
return -1
return 0
if __name__ == "__main__":
# Initialize the parser to read the config
parser = optparse.OptionParser(
usage="%prog [options] [command]\n\n"
"Commands:\n"
" list-rollback List rollback version files in a tree-like format\n"
" execute-rollback Execute rollback version files and cleanup if successful\n\n"
"If no command is specified, kraken will run chaos scenarios.",
)
parser.add_option(
"-c",
"--config",
dest="cfg",
help="config location",
default="config/config.yaml",
)
parser.add_option(
"-o",
"--output",
dest="output",
help="output report location",
default="kraken.report",
)
parser.add_option(
"--junit-testcase",
dest="junit_testcase",
help="junit test case description",
default=None,
)
parser.add_option(
"--junit-testcase-path",
dest="junit_testcase_path",
help="junit test case path",
default=None,
)
parser.add_option(
"--junit-testcase-version",
dest="junit_testcase_version",
help="junit test case version",
default=None,
)
# Add rollback command options
parser.add_option(
"-r",
"--run_uuid",
dest="run_uuid",
help="run UUID to filter rollback operations",
default=None,
)
parser.add_option(
"-s",
"--scenario_type",
dest="scenario_type",
help="scenario type to filter rollback operations",
default=None,
)
parser.add_option(
"-d",
"--debug",
dest="debug",
help="enable debug logging",
default=False,
)
(options, args) = parser.parse_args()
# If no command or regular execution, continue with existing logic
report_file = options.output
tee_handler = TeeLogHandler()
fmt = "%(asctime)s [%(levelname)s] %(message)s"
plain = logging.Formatter(fmt)
colored = ColoredFormatter(
"%(asctime)s [%(log_color)s%(levelname)s%(reset)s] %(message)s",
log_colors={'DEBUG': 'white', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red'},
reset=True, style='%'
)
file_handler = logging.FileHandler(report_file, mode="w")
file_handler.setFormatter(plain)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(colored)
tee_handler.setFormatter(plain)
error_collection_handler = ErrorCollectionHandler(level=logging.ERROR)
handlers = [
file_handler,
stream_handler,
tee_handler,
error_collection_handler,
]
logging.basicConfig(
level=logging.DEBUG if options.debug else logging.INFO,
handlers=handlers,
)
option_error = False
# used to check if there is any missing or wrong parameter that prevents
# the creation of the junit file
junit_error = False
junit_normalized_path = None
retval = 0
junit_start_time = time.time()
# checks if both mandatory options for junit are set
if options.junit_testcase_path and not options.junit_testcase:
logging.error(
"please set junit test case description with --junit-testcase [description] option"
)
option_error = True
junit_error = True
if options.junit_testcase and not options.junit_testcase_path:
logging.error(
"please set junit test case path with --junit-testcase-path [path] option"
)
option_error = True
junit_error = True
# normalized path
if options.junit_testcase:
junit_normalized_path = os.path.normpath(options.junit_testcase_path)
if not os.path.exists(junit_normalized_path):
logging.error(
f"{junit_normalized_path} do not exists, please select a valid path"
)
option_error = True
junit_error = True
if not os.path.isdir(junit_normalized_path):
logging.error(
f"{junit_normalized_path} is a file, please select a valid folder path"
)
option_error = True
junit_error = True
if not os.access(junit_normalized_path, os.W_OK):
logging.error(
f"{junit_normalized_path} is not writable, please select a valid path"
)
option_error = True
junit_error = True
if options.cfg is None:
logging.error("Please check if you have passed the config")
option_error = True
if option_error:
retval = 1
else:
# Check if command is provided as positional argument
command = args[0] if args else None
retval = main(options, command)
junit_endtime = time.time()
# checks the minimum required parameters to write the junit file
if junit_normalized_path and not junit_error:
junit_testcase_xml = get_junit_test_case(
success=True if retval == 0 else False,
time=int(junit_endtime - junit_start_time),
test_suite_name="chaos-krkn",
test_case_description=options.junit_testcase,
test_stdout=tee_handler.get_output(),
test_version=options.junit_testcase_version,
)
junit_testcase_file_path = (
f"{junit_normalized_path}/junit_krkn_{int(time.time())}.xml"
)
logging.info(f"writing junit XML testcase in {junit_testcase_file_path}")
with open(junit_testcase_file_path, "w") as stream:
stream.write(junit_testcase_xml)
sys.exit(retval)