custom weight

This commit is contained in:
Paige Patton
2026-02-24 16:13:02 -05:00
parent 4e7dca9474
commit ec5511b2db
4 changed files with 162 additions and 315 deletions

View File

@@ -1,4 +1,4 @@
"""krkn.resiliency package public interface."""
from .resiliency import Resiliency, compute_resiliency # noqa: F401
from .resiliency import Resiliency # noqa: F401
from .score import calculate_resiliency_score # noqa: F401

View File

@@ -9,11 +9,10 @@ in `krkn.resiliency.score`.
from __future__ import annotations
import base64
import datetime
import logging
import os
from typing import Dict, List, Any, Optional
from typing import Dict, List, Any, Optional, Tuple
import yaml
import json
@@ -28,56 +27,13 @@ from krkn.resiliency.score import calculate_resiliency_score
class Resiliency:
"""Central orchestrator for resiliency scoring."""
ENV_VAR_NAME = "KRKN_ALERTS_YAML_CONTENT"
def __init__(self, alerts_yaml_path: str = "config/alerts.yaml"):
"""Load SLO definitions from the default alerts file, unless the
*KRKN_ALERTS_YAML_CONTENT* environment variable is set in which case its
raw YAML string is parsed instead. The custom YAML may optionally follow
this schema:
prometheus_url: http://prometheus:9090 # optional, currently unused
slos:
- expr: <PromQL>
severity: critical|warning
description: <text>
For backward-compatibility the legacy list-only format is still accepted.
"""
raw_yaml_data: Any
env_yaml = os.getenv(self.ENV_VAR_NAME, '').strip()
if env_yaml:
try:
try:
decoded_yaml = base64.b64decode(env_yaml, validate=True).decode('utf-8')
except (base64.binascii.Error, UnicodeDecodeError) as e:
logging.debug("Failed to base64 decode %s, trying as plain YAML: %s",
self.ENV_VAR_NAME, str(e))
decoded_yaml = env_yaml
raw_yaml_data = yaml.safe_load(decoded_yaml)
logging.info("Loaded SLO configuration from environment variable %s", self.ENV_VAR_NAME)
if isinstance(raw_yaml_data, dict):
self.prometheus_url = raw_yaml_data.get("prometheus_url")
raw_yaml_data = raw_yaml_data.get("slos", raw_yaml_data.get("alerts", []))
except yaml.YAMLError as exc:
logging.error("Failed to parse YAML from %s: %s", self.ENV_VAR_NAME, str(exc))
raw_yaml_data = []
self.prometheus_url = None
except Exception as exc:
logging.error("Unexpected error loading SLOs from %s: %s",
self.ENV_VAR_NAME, str(exc))
raw_yaml_data = []
self.prometheus_url = None
else:
if not os.path.exists(alerts_yaml_path):
raise FileNotFoundError(f"alerts file not found: {alerts_yaml_path}")
with open(alerts_yaml_path, "r", encoding="utf-8") as fp:
raw_yaml_data = yaml.safe_load(fp)
logging.info("Loaded SLO configuration from %s", alerts_yaml_path)
self.prometheus_url = None
def __init__(self, alerts_yaml_path: str):
if not os.path.exists(alerts_yaml_path):
raise FileNotFoundError(f"alerts file not found: {alerts_yaml_path}")
with open(alerts_yaml_path, "r", encoding="utf-8") as fp:
raw_yaml_data = yaml.safe_load(fp)
logging.info("Loaded SLO configuration from %s", alerts_yaml_path)
self._slos = self._normalise_alerts(raw_yaml_data)
self._results: Dict[str, bool] = {}
@@ -95,16 +51,14 @@ class Resiliency:
def calculate_score(
self,
*,
weights: Optional[Dict[str, int]] = None,
health_check_results: Optional[Dict[str, bool]] = None,
) -> int:
"""Calculate the resiliency score using collected SLO results."""
slo_defs = {slo["name"]: slo["severity"] for slo in self._slos}
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
score, breakdown = calculate_resiliency_score(
slo_definitions=slo_defs,
prometheus_results=self._results,
health_check_results=health_check_results or {},
weights=weights,
)
self._score = score
self._breakdown = breakdown
@@ -134,7 +88,6 @@ class Resiliency:
end_time: datetime.datetime,
weight: float | int = 1,
health_check_results: Optional[Dict[str, bool]] = None,
weights: Optional[Dict[str, int]] = None,
) -> int:
"""
Evaluate SLOs for a single scenario window and store the result.
@@ -146,7 +99,6 @@ class Resiliency:
end_time: Window end.
weight: Weight to use for the final weighted average calculation.
health_check_results: Optional mapping of custom health-check name ➡ bool.
weights: Optional override of severity weights for SLO calculation.
Returns:
The calculated integer resiliency score (0-100) for this scenario.
"""
@@ -156,12 +108,11 @@ class Resiliency:
start_time=start_time,
end_time=end_time,
)
slo_defs = {slo["name"]: slo["severity"] for slo in self._slos}
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
score, breakdown = calculate_resiliency_score(
slo_definitions=slo_defs,
prometheus_results=slo_results,
health_check_results=health_check_results or {},
weights=weights,
)
self.scenario_reports.append(
{
@@ -185,7 +136,6 @@ class Resiliency:
prom_cli: KrknPrometheus,
total_start_time: datetime.datetime,
total_end_time: datetime.datetime,
weights: Optional[Dict[str, int]] = None,
) -> None:
if not self.scenario_reports:
raise RuntimeError("No scenario reports added nothing to finalize")
@@ -203,12 +153,11 @@ class Resiliency:
start_time=total_start_time,
end_time=total_end_time,
)
slo_defs = {slo["name"]: slo["severity"] for slo in self._slos}
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
_overall_score, full_breakdown = calculate_resiliency_score(
slo_definitions=slo_defs,
prometheus_results=full_slo_results,
health_check_results={},
weights=weights,
)
self.summary = {
@@ -279,6 +228,118 @@ class Resiliency:
new_scenarios.append(item_dict)
chaos_telemetry.scenarios = new_scenarios
def add_scenario_reports(
self,
*,
scenario_telemetries,
prom_cli: KrknPrometheus,
scenario_type: str,
batch_start_dt: datetime.datetime,
batch_end_dt: datetime.datetime,
weight: int | float = 1,
) -> None:
"""Evaluate SLOs for every telemetry item belonging to a scenario window,
store the result and enrich the telemetry list with a compact resiliency breakdown.
Args:
scenario_telemetries: Iterable with telemetry objects/dicts for the
current scenario batch window.
prom_cli: Pre-configured :class:`KrknPrometheus` instance.
scenario_type: Fallback scenario identifier in case individual
telemetry items do not provide one.
batch_start_dt: Fallback start timestamp for the batch window.
batch_end_dt: Fallback end timestamp for the batch window.
weight: Weight to assign to every scenario when calculating the final
weighted average.
logger: Optional custom logger.
"""
for tel in scenario_telemetries:
try:
# -------- Extract timestamps & scenario name --------------------
if isinstance(tel, dict):
st_ts = tel.get("start_timestamp")
en_ts = tel.get("end_timestamp")
scen_name = tel.get("scenario", scenario_type)
else:
st_ts = getattr(tel, "start_timestamp", None)
en_ts = getattr(tel, "end_timestamp", None)
scen_name = getattr(tel, "scenario", scenario_type)
if st_ts and en_ts:
st_dt = datetime.datetime.fromtimestamp(int(st_ts))
en_dt = datetime.datetime.fromtimestamp(int(en_ts))
else:
st_dt = batch_start_dt
en_dt = batch_end_dt
# -------- Calculate resiliency score for the scenario -----------
self.add_scenario_report(
scenario_name=str(scen_name),
prom_cli=prom_cli,
start_time=st_dt,
end_time=en_dt,
weight=weight,
health_check_results=None,
)
compact = self.compact_breakdown(self.scenario_reports[-1])
if isinstance(tel, dict):
tel["resiliency_report"] = compact
else:
setattr(tel, "resiliency_report", compact)
except Exception as exc:
logging.error("Resiliency per-scenario evaluation failed: %s", exc)
def finalize_and_save(
self,
*,
prom_cli: KrknPrometheus,
total_start_time: datetime.datetime,
total_end_time: datetime.datetime,
run_mode: str = "standalone",
detailed_path: str = "resiliency-report.json",
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Finalize resiliency scoring, persist reports and return them.
Args:
prom_cli: Pre-configured KrknPrometheus instance.
total_start_time: Start time for the full test window.
total_end_time: End time for the full test window.
run_mode: "controller" or "standalone" mode.
Returns:
(detailed_report)
"""
try:
self.finalize_report(
prom_cli=prom_cli,
total_start_time=total_start_time,
total_end_time=total_end_time,
)
detailed = self.get_detailed_report()
if run_mode == "controller":
# krknctl expects the detailed report on stdout in a special format
try:
detailed_json = json.dumps(detailed)
print(f"KRKN_RESILIENCY_REPORT_JSON:{detailed_json}")
logging.info("Resiliency report logged to stdout for krknctl.")
except Exception as exc:
logging.error("Failed to serialize and log detailed resiliency report: %s", exc)
else:
# Stand-alone mode write to files for post-run consumption
try:
with open(detailed_path, "w", encoding="utf-8") as fp:
json.dump(detailed, fp, indent=2)
logging.info("Resiliency report written: %s", detailed_path)
except Exception as io_exc:
logging.error("Failed to write resiliency report files: %s", io_exc)
except Exception as exc:
logging.error("Failed to finalize resiliency scoring: %s", exc)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@@ -299,227 +360,7 @@ class Resiliency:
"name": name,
"expr": alert["expr"],
"severity": str(alert["severity"]).lower(),
"weight": alert.get("weight")
}
)
return slos
# -----------------------------------------------------------------------------
# High-level helper for run_kraken.py
# -----------------------------------------------------------------------------
def compute_resiliency(*,
prometheus: KrknPrometheus,
chaos_telemetry: "ChaosRunTelemetry",
start_time: datetime.datetime,
end_time: datetime.datetime,
run_uuid: Optional[str] = None,
alerts_yaml_path: str = "config/alerts.yaml",
logger: Optional[logging.Logger] = None,
) -> Optional[Dict[str, Any]]:
"""Evaluate SLOs, combine health-check results, attach a resiliency report
to *chaos_telemetry* and return the report. Any failure is logged and *None*
is returned.
"""
log = logger or logging.getLogger(__name__)
try:
resiliency_obj = Resiliency(alerts_yaml_path)
resiliency_obj._results = evaluate_slos(
prom_cli=prometheus,
slo_list=resiliency_obj._slos,
start_time=start_time,
end_time=end_time,
)
health_results: Dict[str, bool] = {}
hc_list = getattr(chaos_telemetry, "health_checks", None)
if hc_list:
for idx, hc in enumerate(hc_list):
# Extract URL/name
try:
name = getattr(hc, "url", None)
if name is None and isinstance(hc, dict):
name = hc.get("url", f"hc_{idx}")
except Exception:
name = f"hc_{idx}"
# Extract status
try:
status = getattr(hc, "status", None)
if status is None and isinstance(hc, dict):
status = hc.get("status", True)
except Exception:
status = False
health_results[str(name)] = bool(status)
resiliency_obj.calculate_score(health_check_results=health_results)
resiliency_report = resiliency_obj.to_dict()
chaos_telemetry.resiliency_report = resiliency_report
chaos_telemetry.resiliency_score = resiliency_report.get("score")
if not hasattr(ChaosRunTelemetry, "_with_resiliency_patch"):
_orig_to_json = ChaosRunTelemetry.to_json
def _to_json_with_resiliency(self):
raw_json = _orig_to_json(self)
try:
data = json.loads(raw_json)
except Exception:
return raw_json
if hasattr(self, "resiliency_report"):
data["resiliency_report"] = self.resiliency_report
if hasattr(self, "resiliency_score"):
data["resiliency_score"] = self.resiliency_score
return json.dumps(data)
ChaosRunTelemetry.to_json = _to_json_with_resiliency
ChaosRunTelemetry._with_resiliency_patch = True
log.info(
"Resiliency score for run %s: %s%%",
run_uuid or "<unknown>",
resiliency_report.get("score"),
)
return resiliency_report
except Exception as exc:
log.error("Failed to compute resiliency score: %s", exc)
return None
# -----------------------------------------------------------------------------
# Helper utilities extracted from run_kraken.py
# -----------------------------------------------------------------------------
from typing import Tuple
def add_scenario_reports(
*,
resiliency_obj: "Resiliency",
scenario_telemetries,
prom_cli: KrknPrometheus,
scenario_type: str,
batch_start_dt: datetime.datetime,
batch_end_dt: datetime.datetime,
weight: int | float = 1,
logger: Optional[logging.Logger] = None,
) -> None:
"""Evaluate SLOs for every telemetry item belonging to a scenario window,
store the result in *resiliency_obj* and enrich the telemetry list with a
compact resiliency breakdown.
Args:
resiliency_obj: Initialized :class:`Resiliency` orchestrator. If *None*,
the call becomes a no-op (saves caller side checks).
scenario_telemetries: Iterable with telemetry objects/dicts for the
current scenario batch window.
prom_cli: Pre-configured :class:`KrknPrometheus` instance.
scenario_type: Fallback scenario identifier in case individual
telemetry items do not provide one.
batch_start_dt: Fallback start timestamp for the batch window.
batch_end_dt: Fallback end timestamp for the batch window.
weight: Weight to assign to every scenario when calculating the final
weighted average.
logger: Optional custom logger.
"""
if resiliency_obj is None:
return
log = logger or logging.getLogger(__name__)
for tel in scenario_telemetries:
try:
# -------- Extract timestamps & scenario name --------------------
if isinstance(tel, dict):
st_ts = tel.get("start_timestamp")
en_ts = tel.get("end_timestamp")
scen_name = tel.get("scenario", scenario_type)
else:
st_ts = getattr(tel, "start_timestamp", None)
en_ts = getattr(tel, "end_timestamp", None)
scen_name = getattr(tel, "scenario", scenario_type)
if st_ts and en_ts:
st_dt = datetime.datetime.fromtimestamp(int(st_ts))
en_dt = datetime.datetime.fromtimestamp(int(en_ts))
else:
st_dt = batch_start_dt
en_dt = batch_end_dt
# -------- Calculate resiliency score for the scenario -----------
resiliency_obj.add_scenario_report(
scenario_name=str(scen_name),
prom_cli=prom_cli,
start_time=st_dt,
end_time=en_dt,
weight=weight,
health_check_results=None,
)
compact = Resiliency.compact_breakdown(
resiliency_obj.scenario_reports[-1]
)
if isinstance(tel, dict):
tel["resiliency_report"] = compact
else:
setattr(tel, "resiliency_report", compact)
except Exception as exc:
log.error("Resiliency per-scenario evaluation failed: %s", exc)
def finalize_and_save(
*,
resiliency_obj: "Resiliency",
prom_cli: KrknPrometheus,
total_start_time: datetime.datetime,
total_end_time: datetime.datetime,
run_mode: str = "standalone",
summary_path: str = "kraken.report",
detailed_path: str = "resiliency-report.json",
logger: Optional[logging.Logger] = None,
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Finalize resiliency scoring, persist reports and return them.
Returns:
(summary_report, detailed_report)
"""
if resiliency_obj is None:
return {}, {}
log = logger or logging.getLogger(__name__)
try:
resiliency_obj.finalize_report(
prom_cli=prom_cli,
total_start_time=total_start_time,
total_end_time=total_end_time,
)
summary = resiliency_obj.get_summary()
detailed = resiliency_obj.get_detailed_report()
if run_mode == "controller":
# krknctl expects the detailed report on stdout in a special format
try:
detailed_json = json.dumps(detailed)
print(f"KRKN_RESILIENCY_REPORT_JSON:{detailed_json}")
log.info("Resiliency report logged to stdout for krknctl.")
except Exception as exc:
log.error("Failed to serialize and log detailed resiliency report: %s", exc)
else:
# Stand-alone mode write to files for post-run consumption
try:
with open(summary_path, "w", encoding="utf-8") as fp:
json.dump(summary, fp, indent=2)
with open(detailed_path, "w", encoding="utf-8") as fp:
json.dump(detailed, fp, indent=2)
log.info("Resiliency reports written: %s and %s", summary_path, detailed_path)
except Exception as io_exc:
log.error("Failed to write resiliency report files: %s", io_exc)
return summary, detailed
except Exception as exc:
log.error("Failed to finalize resiliency scoring: %s", exc)
return {}, {}

View File

@@ -8,53 +8,62 @@ DEFAULT_WEIGHTS = {"critical": 3, "warning": 1}
class SLOResult:
"""Simple container representing evaluation outcome for a single SLO."""
def __init__(self, name: str, severity: str, passed: bool):
def __init__(self, name: str, severity: str, passed: bool, weight: int | None = None):
self.name = name
self.severity = severity
self.passed = passed
self._custom_weight = weight
def weight(self, weights: Dict[str, int] | None = None) -> int:
_w = weights or DEFAULT_WEIGHTS
return _w.get(self.severity, DEFAULT_WEIGHTS["warning"])
def weight(self, severity_weights: Dict[str, int]) -> int:
"""Return the weight for this SLO. Uses custom weight if set, otherwise uses severity-based weight."""
if self._custom_weight is not None:
return self._custom_weight
return severity_weights.get(self.severity, severity_weights.get("warning", 1))
def calculate_resiliency_score(
slo_definitions: Dict[str, str],
slo_definitions: Dict[str, str] | Dict[str, Dict[str, int | str | None]],
prometheus_results: Dict[str, bool],
health_check_results: Dict[str, bool],
weights: Dict[str, int] | None = None,
) -> Tuple[int, Dict[str, int]]:
"""Compute a resiliency score between 0-100 based on SLO pass/fail results.
Args:
slo_definitions: Mapping of SLO name -> severity ("critical" | "warning").
slo_definitions: Mapping of SLO name -> severity ("critical" | "warning") OR
SLO name -> {"severity": str, "weight": int | None}.
prometheus_results: Mapping of SLO name -> bool indicating whether the SLO
passed. Any SLO missing in this mapping is treated as failed.
health_check_results: Mapping of custom health-check name -> bool pass flag.
These checks are always treated as *critical*.
weights: Optional override of severity weights.
Returns:
Tuple containing (final_score, breakdown) where *breakdown* is a dict with
the counts of passed/failed SLOs per severity.
"""
weights = weights or DEFAULT_WEIGHTS
slo_objects: List[SLOResult] = []
for slo_name, severity in slo_definitions.items():
for slo_name, slo_def in slo_definitions.items():
# Exclude SLOs that were not evaluated (query returned no data)
if slo_name not in prometheus_results:
continue
passed = bool(prometheus_results[slo_name])
slo_objects.append(SLOResult(slo_name, severity, passed))
# Support both old format (str) and new format (dict)
if isinstance(slo_def, str):
severity = slo_def
slo_weight = None
else:
severity = slo_def.get("severity", "warning")
slo_weight = slo_def.get("weight")
slo_objects.append(SLOResult(slo_name, severity, passed, weight=slo_weight))
# Health-check SLOs (by default keeping them critical)
for hc_name, hc_passed in health_check_results.items():
slo_objects.append(SLOResult(hc_name, "critical", bool(hc_passed)))
total_points = sum(slo.weight(weights) for slo in slo_objects)
points_lost = sum(slo.weight(weights) for slo in slo_objects if not slo.passed)
total_points = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects)
points_lost = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects if not slo.passed)
score = 0 if total_points == 0 else int(((total_points - points_lost) / total_points) * 100)

View File

@@ -22,10 +22,7 @@ from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
import krkn.prometheus as prometheus_plugin
import server as server
from krkn.resiliency.resiliency import (
Resiliency,
compute_resiliency,
add_scenario_reports,
finalize_and_save,
Resiliency
)
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.ocp import KrknOpenshift
@@ -61,12 +58,7 @@ def main(options, command: Optional[str]) -> int:
print(pyfiglet.figlet_format("kraken"))
logging.info("Starting kraken")
# Determine execution mode (standalone, controller, or disabled)
run_mode = (os.getenv("RESILIENCY_ENABLED_MODE") or "standalone").lower().strip()
valid_run_modes = {"standalone", "controller", "disabled"}
if run_mode not in valid_run_modes:
logging.warning("Unknown RESILIENCY_ENABLED_MODE '%s'. Defaulting to 'standalone'", run_mode)
run_mode = "standalone"
cfg = options.cfg
# Parse and read the config
@@ -79,6 +71,7 @@ def main(options, command: Optional[str]) -> int:
get_yaml_item_value(config["kraken"], "kubeconfig_path", "")
)
kraken_config = cfg
chaos_scenarios = get_yaml_item_value(config["kraken"], "chaos_scenarios", [])
publish_running_status = get_yaml_item_value(
config["kraken"], "publish_kraken_status", False
@@ -100,6 +93,13 @@ def main(options, command: Optional[str]) -> int:
config["kraken"], "signal_address", "0.0.0.0"
)
run_signal = get_yaml_item_value(config["kraken"], "signal_state", "RUN")
# Determine execution mode (standalone, controller, or disabled)
run_mode = get_yaml_item_value(config["resiliency"], "resiliency_run_mode", "standalone")
valid_run_modes = {"standalone", "controller", "disabled"}
if run_mode not in valid_run_modes:
logging.warning("Unknown resiliency_run_mode '%s'. Defaulting to 'standalone'", run_mode)
run_mode = "standalone"
wait_duration = get_yaml_item_value(config["tunings"], "wait_duration", 60)
iterations = get_yaml_item_value(config["tunings"], "iterations", 1)
daemon_mode = get_yaml_item_value(config["tunings"], "daemon_mode", False)
@@ -282,8 +282,8 @@ def main(options, command: Optional[str]) -> int:
except Exception as prom_exc:
logging.error("Prometheus connectivity test failed: %s. Disabling resiliency features as Prometheus is required for SLO evaluation.", prom_exc)
run_mode = "disabled"
resiliency_obj = Resiliency() if run_mode != "disabled" else None # Initialize resiliency orchestrator
resiliency_alerts = get_yaml_item_value(config['resiliency'], "resiliency_file", get_yaml_item_value(config['performance_monitoring'],"alert_profile", "config/alerts.yaml"))
resiliency_obj = Resiliency(resiliency_alerts) if run_mode != "disabled" else None # Initialize resiliency orchestrator
logging.info("Server URL: %s" % kubecli.get_host())
if command == "list-rollback":
@@ -410,8 +410,7 @@ def main(options, command: Optional[str]) -> int:
chaos_telemetry.scenarios.extend(scenario_telemetries)
batch_window_end_dt = datetime.datetime.utcnow()
if resiliency_obj:
add_scenario_reports(
resiliency_obj=resiliency_obj,
resiliency_obj.add_scenario_reports(
scenario_telemetries=scenario_telemetries,
prom_cli=prometheus,
scenario_type=scenario_type,
@@ -490,13 +489,11 @@ def main(options, command: Optional[str]) -> int:
if resiliency_obj:
try:
summary_report, detailed_report = finalize_and_save(
resiliency_obj=resiliency_obj,
resiliency_obj.finalize_and_save(
prom_cli=prometheus,
total_start_time=datetime.datetime.fromtimestamp(start_time),
total_end_time=datetime.datetime.fromtimestamp(end_time),
run_mode=run_mode,
logger=logging,
)
except Exception as e: