diff --git a/krkn/resiliency/__init__.py b/krkn/resiliency/__init__.py index d88fbf8d..b18b57c2 100644 --- a/krkn/resiliency/__init__.py +++ b/krkn/resiliency/__init__.py @@ -1,4 +1,4 @@ """krkn.resiliency package public interface.""" -from .resiliency import Resiliency, compute_resiliency # noqa: F401 +from .resiliency import Resiliency # noqa: F401 from .score import calculate_resiliency_score # noqa: F401 diff --git a/krkn/resiliency/resiliency.py b/krkn/resiliency/resiliency.py index 7d18e127..a7d2e079 100644 --- a/krkn/resiliency/resiliency.py +++ b/krkn/resiliency/resiliency.py @@ -9,11 +9,10 @@ in `krkn.resiliency.score`. from __future__ import annotations -import base64 import datetime import logging import os -from typing import Dict, List, Any, Optional +from typing import Dict, List, Any, Optional, Tuple import yaml import json @@ -28,56 +27,13 @@ from krkn.resiliency.score import calculate_resiliency_score class Resiliency: """Central orchestrator for resiliency scoring.""" - ENV_VAR_NAME = "KRKN_ALERTS_YAML_CONTENT" - - def __init__(self, alerts_yaml_path: str = "config/alerts.yaml"): - """Load SLO definitions from the default alerts file, unless the - *KRKN_ALERTS_YAML_CONTENT* environment variable is set – in which case its - raw YAML string is parsed instead. The custom YAML may optionally follow - this schema: - - prometheus_url: http://prometheus:9090 # optional, currently unused - slos: - - expr: - severity: critical|warning - description: - - For backward-compatibility the legacy list-only format is still accepted. - """ - raw_yaml_data: Any - env_yaml = os.getenv(self.ENV_VAR_NAME, '').strip() - if env_yaml: - try: - try: - decoded_yaml = base64.b64decode(env_yaml, validate=True).decode('utf-8') - except (base64.binascii.Error, UnicodeDecodeError) as e: - logging.debug("Failed to base64 decode %s, trying as plain YAML: %s", - self.ENV_VAR_NAME, str(e)) - decoded_yaml = env_yaml - - raw_yaml_data = yaml.safe_load(decoded_yaml) - logging.info("Loaded SLO configuration from environment variable %s", self.ENV_VAR_NAME) - - if isinstance(raw_yaml_data, dict): - self.prometheus_url = raw_yaml_data.get("prometheus_url") - raw_yaml_data = raw_yaml_data.get("slos", raw_yaml_data.get("alerts", [])) - - except yaml.YAMLError as exc: - logging.error("Failed to parse YAML from %s: %s", self.ENV_VAR_NAME, str(exc)) - raw_yaml_data = [] - self.prometheus_url = None - except Exception as exc: - logging.error("Unexpected error loading SLOs from %s: %s", - self.ENV_VAR_NAME, str(exc)) - raw_yaml_data = [] - self.prometheus_url = None - else: - if not os.path.exists(alerts_yaml_path): - raise FileNotFoundError(f"alerts file not found: {alerts_yaml_path}") - with open(alerts_yaml_path, "r", encoding="utf-8") as fp: - raw_yaml_data = yaml.safe_load(fp) - logging.info("Loaded SLO configuration from %s", alerts_yaml_path) - self.prometheus_url = None + def __init__(self, alerts_yaml_path: str): + + if not os.path.exists(alerts_yaml_path): + raise FileNotFoundError(f"alerts file not found: {alerts_yaml_path}") + with open(alerts_yaml_path, "r", encoding="utf-8") as fp: + raw_yaml_data = yaml.safe_load(fp) + logging.info("Loaded SLO configuration from %s", alerts_yaml_path) self._slos = self._normalise_alerts(raw_yaml_data) self._results: Dict[str, bool] = {} @@ -95,16 +51,14 @@ class Resiliency: def calculate_score( self, *, - weights: Optional[Dict[str, int]] = None, health_check_results: Optional[Dict[str, bool]] = None, ) -> int: """Calculate the resiliency score using collected SLO results.""" - slo_defs = {slo["name"]: slo["severity"] for slo in self._slos} + slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos} score, breakdown = calculate_resiliency_score( slo_definitions=slo_defs, prometheus_results=self._results, health_check_results=health_check_results or {}, - weights=weights, ) self._score = score self._breakdown = breakdown @@ -134,7 +88,6 @@ class Resiliency: end_time: datetime.datetime, weight: float | int = 1, health_check_results: Optional[Dict[str, bool]] = None, - weights: Optional[Dict[str, int]] = None, ) -> int: """ Evaluate SLOs for a single scenario window and store the result. @@ -146,7 +99,6 @@ class Resiliency: end_time: Window end. weight: Weight to use for the final weighted average calculation. health_check_results: Optional mapping of custom health-check name ➡ bool. - weights: Optional override of severity weights for SLO calculation. Returns: The calculated integer resiliency score (0-100) for this scenario. """ @@ -156,12 +108,11 @@ class Resiliency: start_time=start_time, end_time=end_time, ) - slo_defs = {slo["name"]: slo["severity"] for slo in self._slos} + slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos} score, breakdown = calculate_resiliency_score( slo_definitions=slo_defs, prometheus_results=slo_results, health_check_results=health_check_results or {}, - weights=weights, ) self.scenario_reports.append( { @@ -185,7 +136,6 @@ class Resiliency: prom_cli: KrknPrometheus, total_start_time: datetime.datetime, total_end_time: datetime.datetime, - weights: Optional[Dict[str, int]] = None, ) -> None: if not self.scenario_reports: raise RuntimeError("No scenario reports added – nothing to finalize") @@ -203,12 +153,11 @@ class Resiliency: start_time=total_start_time, end_time=total_end_time, ) - slo_defs = {slo["name"]: slo["severity"] for slo in self._slos} + slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos} _overall_score, full_breakdown = calculate_resiliency_score( slo_definitions=slo_defs, prometheus_results=full_slo_results, health_check_results={}, - weights=weights, ) self.summary = { @@ -279,6 +228,118 @@ class Resiliency: new_scenarios.append(item_dict) chaos_telemetry.scenarios = new_scenarios + def add_scenario_reports( + self, + *, + scenario_telemetries, + prom_cli: KrknPrometheus, + scenario_type: str, + batch_start_dt: datetime.datetime, + batch_end_dt: datetime.datetime, + weight: int | float = 1, + ) -> None: + """Evaluate SLOs for every telemetry item belonging to a scenario window, + store the result and enrich the telemetry list with a compact resiliency breakdown. + + Args: + scenario_telemetries: Iterable with telemetry objects/dicts for the + current scenario batch window. + prom_cli: Pre-configured :class:`KrknPrometheus` instance. + scenario_type: Fallback scenario identifier in case individual + telemetry items do not provide one. + batch_start_dt: Fallback start timestamp for the batch window. + batch_end_dt: Fallback end timestamp for the batch window. + weight: Weight to assign to every scenario when calculating the final + weighted average. + logger: Optional custom logger. + """ + + for tel in scenario_telemetries: + try: + # -------- Extract timestamps & scenario name -------------------- + if isinstance(tel, dict): + st_ts = tel.get("start_timestamp") + en_ts = tel.get("end_timestamp") + scen_name = tel.get("scenario", scenario_type) + else: + st_ts = getattr(tel, "start_timestamp", None) + en_ts = getattr(tel, "end_timestamp", None) + scen_name = getattr(tel, "scenario", scenario_type) + + if st_ts and en_ts: + st_dt = datetime.datetime.fromtimestamp(int(st_ts)) + en_dt = datetime.datetime.fromtimestamp(int(en_ts)) + else: + st_dt = batch_start_dt + en_dt = batch_end_dt + + # -------- Calculate resiliency score for the scenario ----------- + self.add_scenario_report( + scenario_name=str(scen_name), + prom_cli=prom_cli, + start_time=st_dt, + end_time=en_dt, + weight=weight, + health_check_results=None, + ) + + compact = self.compact_breakdown(self.scenario_reports[-1]) + if isinstance(tel, dict): + tel["resiliency_report"] = compact + else: + setattr(tel, "resiliency_report", compact) + except Exception as exc: + logging.error("Resiliency per-scenario evaluation failed: %s", exc) + + def finalize_and_save( + self, + *, + prom_cli: KrknPrometheus, + total_start_time: datetime.datetime, + total_end_time: datetime.datetime, + run_mode: str = "standalone", + detailed_path: str = "resiliency-report.json", + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Finalize resiliency scoring, persist reports and return them. + + Args: + prom_cli: Pre-configured KrknPrometheus instance. + total_start_time: Start time for the full test window. + total_end_time: End time for the full test window. + run_mode: "controller" or "standalone" mode. + + Returns: + (detailed_report) + """ + + try: + self.finalize_report( + prom_cli=prom_cli, + total_start_time=total_start_time, + total_end_time=total_end_time, + ) + detailed = self.get_detailed_report() + + if run_mode == "controller": + # krknctl expects the detailed report on stdout in a special format + try: + detailed_json = json.dumps(detailed) + print(f"KRKN_RESILIENCY_REPORT_JSON:{detailed_json}") + logging.info("Resiliency report logged to stdout for krknctl.") + except Exception as exc: + logging.error("Failed to serialize and log detailed resiliency report: %s", exc) + else: + # Stand-alone mode – write to files for post-run consumption + try: + with open(detailed_path, "w", encoding="utf-8") as fp: + json.dump(detailed, fp, indent=2) + logging.info("Resiliency report written: %s", detailed_path) + except Exception as io_exc: + logging.error("Failed to write resiliency report files: %s", io_exc) + + except Exception as exc: + logging.error("Failed to finalize resiliency scoring: %s", exc) + # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @@ -299,227 +360,7 @@ class Resiliency: "name": name, "expr": alert["expr"], "severity": str(alert["severity"]).lower(), + "weight": alert.get("weight") } ) return slos - -# ----------------------------------------------------------------------------- -# High-level helper for run_kraken.py -# ----------------------------------------------------------------------------- - -def compute_resiliency(*, - prometheus: KrknPrometheus, - chaos_telemetry: "ChaosRunTelemetry", - start_time: datetime.datetime, - end_time: datetime.datetime, - run_uuid: Optional[str] = None, - alerts_yaml_path: str = "config/alerts.yaml", - logger: Optional[logging.Logger] = None, -) -> Optional[Dict[str, Any]]: - """Evaluate SLOs, combine health-check results, attach a resiliency report - to *chaos_telemetry* and return the report. Any failure is logged and *None* - is returned. - """ - - log = logger or logging.getLogger(__name__) - - try: - resiliency_obj = Resiliency(alerts_yaml_path) - resiliency_obj._results = evaluate_slos( - prom_cli=prometheus, - slo_list=resiliency_obj._slos, - start_time=start_time, - end_time=end_time, - ) - - health_results: Dict[str, bool] = {} - hc_list = getattr(chaos_telemetry, "health_checks", None) - if hc_list: - for idx, hc in enumerate(hc_list): - # Extract URL/name - try: - name = getattr(hc, "url", None) - if name is None and isinstance(hc, dict): - name = hc.get("url", f"hc_{idx}") - except Exception: - name = f"hc_{idx}" - # Extract status - try: - status = getattr(hc, "status", None) - if status is None and isinstance(hc, dict): - status = hc.get("status", True) - except Exception: - status = False - health_results[str(name)] = bool(status) - - resiliency_obj.calculate_score(health_check_results=health_results) - resiliency_report = resiliency_obj.to_dict() - chaos_telemetry.resiliency_report = resiliency_report - chaos_telemetry.resiliency_score = resiliency_report.get("score") - - - if not hasattr(ChaosRunTelemetry, "_with_resiliency_patch"): - _orig_to_json = ChaosRunTelemetry.to_json - - def _to_json_with_resiliency(self): - raw_json = _orig_to_json(self) - try: - data = json.loads(raw_json) - except Exception: - return raw_json - if hasattr(self, "resiliency_report"): - data["resiliency_report"] = self.resiliency_report - if hasattr(self, "resiliency_score"): - data["resiliency_score"] = self.resiliency_score - return json.dumps(data) - - ChaosRunTelemetry.to_json = _to_json_with_resiliency - ChaosRunTelemetry._with_resiliency_patch = True - - log.info( - "Resiliency score for run %s: %s%%", - run_uuid or "", - resiliency_report.get("score"), - ) - return resiliency_report - - except Exception as exc: - log.error("Failed to compute resiliency score: %s", exc) - return None - - -# ----------------------------------------------------------------------------- -# Helper utilities extracted from run_kraken.py -# ----------------------------------------------------------------------------- - -from typing import Tuple - - -def add_scenario_reports( - *, - resiliency_obj: "Resiliency", - scenario_telemetries, - prom_cli: KrknPrometheus, - scenario_type: str, - batch_start_dt: datetime.datetime, - batch_end_dt: datetime.datetime, - weight: int | float = 1, - logger: Optional[logging.Logger] = None, -) -> None: - """Evaluate SLOs for every telemetry item belonging to a scenario window, - store the result in *resiliency_obj* and enrich the telemetry list with a - compact resiliency breakdown. - - Args: - resiliency_obj: Initialized :class:`Resiliency` orchestrator. If *None*, - the call becomes a no-op (saves caller side checks). - scenario_telemetries: Iterable with telemetry objects/dicts for the - current scenario batch window. - prom_cli: Pre-configured :class:`KrknPrometheus` instance. - scenario_type: Fallback scenario identifier in case individual - telemetry items do not provide one. - batch_start_dt: Fallback start timestamp for the batch window. - batch_end_dt: Fallback end timestamp for the batch window. - weight: Weight to assign to every scenario when calculating the final - weighted average. - logger: Optional custom logger. - """ - if resiliency_obj is None: - return - - log = logger or logging.getLogger(__name__) - - for tel in scenario_telemetries: - try: - # -------- Extract timestamps & scenario name -------------------- - if isinstance(tel, dict): - st_ts = tel.get("start_timestamp") - en_ts = tel.get("end_timestamp") - scen_name = tel.get("scenario", scenario_type) - else: - st_ts = getattr(tel, "start_timestamp", None) - en_ts = getattr(tel, "end_timestamp", None) - scen_name = getattr(tel, "scenario", scenario_type) - - if st_ts and en_ts: - st_dt = datetime.datetime.fromtimestamp(int(st_ts)) - en_dt = datetime.datetime.fromtimestamp(int(en_ts)) - else: - st_dt = batch_start_dt - en_dt = batch_end_dt - - # -------- Calculate resiliency score for the scenario ----------- - resiliency_obj.add_scenario_report( - scenario_name=str(scen_name), - prom_cli=prom_cli, - start_time=st_dt, - end_time=en_dt, - weight=weight, - health_check_results=None, - ) - - compact = Resiliency.compact_breakdown( - resiliency_obj.scenario_reports[-1] - ) - if isinstance(tel, dict): - tel["resiliency_report"] = compact - else: - setattr(tel, "resiliency_report", compact) - except Exception as exc: - log.error("Resiliency per-scenario evaluation failed: %s", exc) - - -def finalize_and_save( - *, - resiliency_obj: "Resiliency", - prom_cli: KrknPrometheus, - total_start_time: datetime.datetime, - total_end_time: datetime.datetime, - run_mode: str = "standalone", - summary_path: str = "kraken.report", - detailed_path: str = "resiliency-report.json", - logger: Optional[logging.Logger] = None, -) -> Tuple[Dict[str, Any], Dict[str, Any]]: - """Finalize resiliency scoring, persist reports and return them. - - Returns: - (summary_report, detailed_report) - """ - if resiliency_obj is None: - return {}, {} - - log = logger or logging.getLogger(__name__) - - try: - resiliency_obj.finalize_report( - prom_cli=prom_cli, - total_start_time=total_start_time, - total_end_time=total_end_time, - ) - summary = resiliency_obj.get_summary() - detailed = resiliency_obj.get_detailed_report() - - if run_mode == "controller": - # krknctl expects the detailed report on stdout in a special format - try: - detailed_json = json.dumps(detailed) - print(f"KRKN_RESILIENCY_REPORT_JSON:{detailed_json}") - log.info("Resiliency report logged to stdout for krknctl.") - except Exception as exc: - log.error("Failed to serialize and log detailed resiliency report: %s", exc) - else: - # Stand-alone mode – write to files for post-run consumption - try: - with open(summary_path, "w", encoding="utf-8") as fp: - json.dump(summary, fp, indent=2) - with open(detailed_path, "w", encoding="utf-8") as fp: - json.dump(detailed, fp, indent=2) - log.info("Resiliency reports written: %s and %s", summary_path, detailed_path) - except Exception as io_exc: - log.error("Failed to write resiliency report files: %s", io_exc) - - return summary, detailed - - except Exception as exc: - log.error("Failed to finalize resiliency scoring: %s", exc) - return {}, {} \ No newline at end of file diff --git a/krkn/resiliency/score.py b/krkn/resiliency/score.py index 389ffb8c..811ed216 100644 --- a/krkn/resiliency/score.py +++ b/krkn/resiliency/score.py @@ -8,53 +8,62 @@ DEFAULT_WEIGHTS = {"critical": 3, "warning": 1} class SLOResult: """Simple container representing evaluation outcome for a single SLO.""" - def __init__(self, name: str, severity: str, passed: bool): + def __init__(self, name: str, severity: str, passed: bool, weight: int | None = None): self.name = name self.severity = severity self.passed = passed + self._custom_weight = weight - def weight(self, weights: Dict[str, int] | None = None) -> int: - _w = weights or DEFAULT_WEIGHTS - return _w.get(self.severity, DEFAULT_WEIGHTS["warning"]) + def weight(self, severity_weights: Dict[str, int]) -> int: + """Return the weight for this SLO. Uses custom weight if set, otherwise uses severity-based weight.""" + if self._custom_weight is not None: + return self._custom_weight + return severity_weights.get(self.severity, severity_weights.get("warning", 1)) def calculate_resiliency_score( - slo_definitions: Dict[str, str], + slo_definitions: Dict[str, str] | Dict[str, Dict[str, int | str | None]], prometheus_results: Dict[str, bool], health_check_results: Dict[str, bool], - weights: Dict[str, int] | None = None, ) -> Tuple[int, Dict[str, int]]: """Compute a resiliency score between 0-100 based on SLO pass/fail results. Args: - slo_definitions: Mapping of SLO name -> severity ("critical" | "warning"). + slo_definitions: Mapping of SLO name -> severity ("critical" | "warning") OR + SLO name -> {"severity": str, "weight": int | None}. prometheus_results: Mapping of SLO name -> bool indicating whether the SLO passed. Any SLO missing in this mapping is treated as failed. health_check_results: Mapping of custom health-check name -> bool pass flag. These checks are always treated as *critical*. - weights: Optional override of severity weights. Returns: Tuple containing (final_score, breakdown) where *breakdown* is a dict with the counts of passed/failed SLOs per severity. """ - weights = weights or DEFAULT_WEIGHTS - slo_objects: List[SLOResult] = [] - for slo_name, severity in slo_definitions.items(): + for slo_name, slo_def in slo_definitions.items(): # Exclude SLOs that were not evaluated (query returned no data) if slo_name not in prometheus_results: continue passed = bool(prometheus_results[slo_name]) - slo_objects.append(SLOResult(slo_name, severity, passed)) + + # Support both old format (str) and new format (dict) + if isinstance(slo_def, str): + severity = slo_def + slo_weight = None + else: + severity = slo_def.get("severity", "warning") + slo_weight = slo_def.get("weight") + + slo_objects.append(SLOResult(slo_name, severity, passed, weight=slo_weight)) # Health-check SLOs (by default keeping them critical) for hc_name, hc_passed in health_check_results.items(): slo_objects.append(SLOResult(hc_name, "critical", bool(hc_passed))) - total_points = sum(slo.weight(weights) for slo in slo_objects) - points_lost = sum(slo.weight(weights) for slo in slo_objects if not slo.passed) + total_points = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects) + points_lost = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects if not slo.passed) score = 0 if total_points == 0 else int(((total_points - points_lost) / total_points) * 100) diff --git a/run_kraken.py b/run_kraken.py index 619c5ca1..e03a581e 100644 --- a/run_kraken.py +++ b/run_kraken.py @@ -22,10 +22,7 @@ from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus import krkn.prometheus as prometheus_plugin import server as server from krkn.resiliency.resiliency import ( - Resiliency, - compute_resiliency, - add_scenario_reports, - finalize_and_save, + Resiliency ) from krkn_lib.k8s import KrknKubernetes from krkn_lib.ocp import KrknOpenshift @@ -61,12 +58,7 @@ def main(options, command: Optional[str]) -> int: print(pyfiglet.figlet_format("kraken")) logging.info("Starting kraken") - # Determine execution mode (standalone, controller, or disabled) - run_mode = (os.getenv("RESILIENCY_ENABLED_MODE") or "standalone").lower().strip() - valid_run_modes = {"standalone", "controller", "disabled"} - if run_mode not in valid_run_modes: - logging.warning("Unknown RESILIENCY_ENABLED_MODE '%s'. Defaulting to 'standalone'", run_mode) - run_mode = "standalone" + cfg = options.cfg # Parse and read the config @@ -79,6 +71,7 @@ def main(options, command: Optional[str]) -> int: get_yaml_item_value(config["kraken"], "kubeconfig_path", "") ) kraken_config = cfg + chaos_scenarios = get_yaml_item_value(config["kraken"], "chaos_scenarios", []) publish_running_status = get_yaml_item_value( config["kraken"], "publish_kraken_status", False @@ -100,6 +93,13 @@ def main(options, command: Optional[str]) -> int: config["kraken"], "signal_address", "0.0.0.0" ) run_signal = get_yaml_item_value(config["kraken"], "signal_state", "RUN") + + # Determine execution mode (standalone, controller, or disabled) + run_mode = get_yaml_item_value(config["resiliency"], "resiliency_run_mode", "standalone") + valid_run_modes = {"standalone", "controller", "disabled"} + if run_mode not in valid_run_modes: + logging.warning("Unknown resiliency_run_mode '%s'. Defaulting to 'standalone'", run_mode) + run_mode = "standalone" wait_duration = get_yaml_item_value(config["tunings"], "wait_duration", 60) iterations = get_yaml_item_value(config["tunings"], "iterations", 1) daemon_mode = get_yaml_item_value(config["tunings"], "daemon_mode", False) @@ -282,8 +282,8 @@ def main(options, command: Optional[str]) -> int: except Exception as prom_exc: logging.error("Prometheus connectivity test failed: %s. Disabling resiliency features as Prometheus is required for SLO evaluation.", prom_exc) run_mode = "disabled" - - resiliency_obj = Resiliency() if run_mode != "disabled" else None # Initialize resiliency orchestrator + resiliency_alerts = get_yaml_item_value(config['resiliency'], "resiliency_file", get_yaml_item_value(config['performance_monitoring'],"alert_profile", "config/alerts.yaml")) + resiliency_obj = Resiliency(resiliency_alerts) if run_mode != "disabled" else None # Initialize resiliency orchestrator logging.info("Server URL: %s" % kubecli.get_host()) if command == "list-rollback": @@ -410,8 +410,7 @@ def main(options, command: Optional[str]) -> int: chaos_telemetry.scenarios.extend(scenario_telemetries) batch_window_end_dt = datetime.datetime.utcnow() if resiliency_obj: - add_scenario_reports( - resiliency_obj=resiliency_obj, + resiliency_obj.add_scenario_reports( scenario_telemetries=scenario_telemetries, prom_cli=prometheus, scenario_type=scenario_type, @@ -490,13 +489,11 @@ def main(options, command: Optional[str]) -> int: if resiliency_obj: try: - summary_report, detailed_report = finalize_and_save( - resiliency_obj=resiliency_obj, + resiliency_obj.finalize_and_save( prom_cli=prometheus, total_start_time=datetime.datetime.fromtimestamp(start_time), total_end_time=datetime.datetime.fromtimestamp(end_time), run_mode=run_mode, - logger=logging, ) except Exception as e: