mirror of
https://github.com/krkn-chaos/krkn.git
synced 2026-03-16 16:40:42 +00:00
Compare commits
5 Commits
main
...
v5.0.1-bet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b9d7c8ba12 | ||
|
|
e8075743ab | ||
|
|
ec5511b2db | ||
|
|
4e7dca9474 | ||
|
|
edf0f3d1c9 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -17,6 +17,7 @@ __pycache__/*
|
||||
kube-burner*
|
||||
kube_burner*
|
||||
recommender_*.json
|
||||
resiliency*.json
|
||||
|
||||
# Project files
|
||||
.ropeproject
|
||||
|
||||
@@ -131,4 +131,5 @@ kubevirt_checks: # Utilizing virt che
|
||||
disconnected: False # Boolean of how to try to connect to the VMIs; if True will use the ip_address to try ssh from within a node, if false will use the name and uses virtctl to try to connect; Default is False
|
||||
ssh_node: "" # If set, will be a backup way to ssh to a node. Will want to set to a node that isn't targeted in chaos
|
||||
node_names: ""
|
||||
exit_on_failure: # If value is True and VMI's are failing post chaos returns failure, values can be True/False
|
||||
exit_on_failure: # If value is True and VMI's are failing post chaos returns failure, values can be True/False
|
||||
|
||||
|
||||
79
krkn/prometheus/collector.py
Normal file
79
krkn/prometheus/collector.py
Normal file
@@ -0,0 +1,79 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
from typing import Dict, Any, List, Optional
|
||||
|
||||
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# SLO evaluation helpers (used by krkn.resiliency)
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
def slo_passed(prometheus_result: List[Any]) -> Optional[bool]:
|
||||
if not prometheus_result:
|
||||
return None
|
||||
has_samples = False
|
||||
for series in prometheus_result:
|
||||
if "values" in series:
|
||||
has_samples = True
|
||||
for _ts, val in series["values"]:
|
||||
try:
|
||||
if float(val) > 0:
|
||||
return False
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
elif "value" in series:
|
||||
has_samples = True
|
||||
try:
|
||||
return float(series["value"][1]) == 0
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
|
||||
# If we reached here and never saw any samples, skip
|
||||
return None if not has_samples else True
|
||||
|
||||
|
||||
def evaluate_slos(
|
||||
prom_cli: KrknPrometheus,
|
||||
slo_list: List[Dict[str, Any]],
|
||||
start_time: datetime.datetime,
|
||||
end_time: datetime.datetime,
|
||||
) -> Dict[str, bool]:
|
||||
"""Evaluate a list of SLO expressions against Prometheus.
|
||||
|
||||
Args:
|
||||
prom_cli: Configured Prometheus client.
|
||||
slo_list: List of dicts with keys ``name``, ``expr``.
|
||||
start_time: Start timestamp.
|
||||
end_time: End timestamp.
|
||||
granularity: Step in seconds for range queries.
|
||||
Returns:
|
||||
Mapping name -> bool indicating pass status.
|
||||
True means good we passed the SLO test otherwise failed the SLO
|
||||
"""
|
||||
results: Dict[str, bool] = {}
|
||||
logging.info("Evaluating %d SLOs over window %s – %s", len(slo_list), start_time, end_time)
|
||||
for slo in slo_list:
|
||||
expr = slo["expr"]
|
||||
name = slo["name"]
|
||||
try:
|
||||
response = prom_cli.process_prom_query_in_range(
|
||||
expr,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
)
|
||||
|
||||
passed = slo_passed(response)
|
||||
if passed is None:
|
||||
# Absence of data indicates the condition did not trigger; treat as pass.
|
||||
logging.debug("SLO '%s' query returned no data; assuming pass.", name)
|
||||
results[name] = True
|
||||
else:
|
||||
results[name] = passed
|
||||
except Exception as exc:
|
||||
logging.error("PromQL query failed for SLO '%s': %s", name, exc)
|
||||
results[name] = False
|
||||
return results
|
||||
4
krkn/resiliency/__init__.py
Normal file
4
krkn/resiliency/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
"""krkn.resiliency package public interface."""
|
||||
|
||||
from .resiliency import Resiliency # noqa: F401
|
||||
from .score import calculate_resiliency_score # noqa: F401
|
||||
366
krkn/resiliency/resiliency.py
Normal file
366
krkn/resiliency/resiliency.py
Normal file
@@ -0,0 +1,366 @@
|
||||
"""Resiliency evaluation orchestrator for Krkn chaos runs.
|
||||
|
||||
This module provides the `Resiliency` class which loads the canonical
|
||||
`alerts.yaml`, executes every SLO expression against Prometheus in the
|
||||
chaos-test time window, determines pass/fail status and calculates an
|
||||
overall resiliency score using the generic weighted model implemented
|
||||
in `krkn.resiliency.score`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
from typing import Dict, List, Any, Optional, Tuple
|
||||
|
||||
import yaml
|
||||
import json
|
||||
import dataclasses
|
||||
from krkn_lib.models.telemetry import ChaosRunTelemetry
|
||||
|
||||
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
|
||||
from krkn.prometheus.collector import evaluate_slos
|
||||
from krkn.resiliency.score import calculate_resiliency_score
|
||||
|
||||
|
||||
class Resiliency:
|
||||
"""Central orchestrator for resiliency scoring."""
|
||||
|
||||
def __init__(self, alerts_yaml_path: str):
|
||||
|
||||
if not os.path.exists(alerts_yaml_path):
|
||||
raise FileNotFoundError(f"alerts file not found: {alerts_yaml_path}")
|
||||
with open(alerts_yaml_path, "r", encoding="utf-8") as fp:
|
||||
raw_yaml_data = yaml.safe_load(fp)
|
||||
logging.info("Loaded SLO configuration from %s", alerts_yaml_path)
|
||||
|
||||
self._slos = self._normalise_alerts(raw_yaml_data)
|
||||
self._results: Dict[str, bool] = {}
|
||||
self._score: Optional[int] = None
|
||||
self._breakdown: Optional[Dict[str, int]] = None
|
||||
self._health_check_results: Dict[str, bool] = {}
|
||||
self.scenario_reports: List[Dict[str, Any]] = []
|
||||
self.summary: Optional[Dict[str, Any]] = None
|
||||
self.detailed_report: Optional[Dict[str, Any]] = None
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------
|
||||
|
||||
def calculate_score(
|
||||
self,
|
||||
*,
|
||||
health_check_results: Optional[Dict[str, bool]] = None,
|
||||
) -> int:
|
||||
"""Calculate the resiliency score using collected SLO results."""
|
||||
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions=slo_defs,
|
||||
prometheus_results=self._results,
|
||||
health_check_results=health_check_results or {},
|
||||
)
|
||||
self._score = score
|
||||
self._breakdown = breakdown
|
||||
self._health_check_results = health_check_results or {}
|
||||
return score
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Return a dictionary ready for telemetry output."""
|
||||
if self._score is None:
|
||||
raise RuntimeError("calculate_score() must be called before to_dict()")
|
||||
return {
|
||||
"score": self._score,
|
||||
"breakdown": self._breakdown,
|
||||
"slo_results": self._results,
|
||||
"health_check_results": getattr(self, "_health_check_results", {}),
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Scenario-based resiliency evaluation
|
||||
# ------------------------------------------------------------------
|
||||
def add_scenario_report(
|
||||
self,
|
||||
*,
|
||||
scenario_name: str,
|
||||
prom_cli: KrknPrometheus,
|
||||
start_time: datetime.datetime,
|
||||
end_time: datetime.datetime,
|
||||
weight: float | int = 1,
|
||||
health_check_results: Optional[Dict[str, bool]] = None,
|
||||
) -> int:
|
||||
"""
|
||||
Evaluate SLOs for a single scenario window and store the result.
|
||||
|
||||
Args:
|
||||
scenario_name: Human-friendly scenario identifier.
|
||||
prom_cli: Initialized KrknPrometheus instance.
|
||||
start_time: Window start.
|
||||
end_time: Window end.
|
||||
weight: Weight to use for the final weighted average calculation.
|
||||
health_check_results: Optional mapping of custom health-check name ➡ bool.
|
||||
Returns:
|
||||
The calculated integer resiliency score (0-100) for this scenario.
|
||||
"""
|
||||
slo_results = evaluate_slos(
|
||||
prom_cli=prom_cli,
|
||||
slo_list=self._slos,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
)
|
||||
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions=slo_defs,
|
||||
prometheus_results=slo_results,
|
||||
health_check_results=health_check_results or {},
|
||||
)
|
||||
self.scenario_reports.append(
|
||||
{
|
||||
"name": scenario_name,
|
||||
"window": {
|
||||
"start": start_time.isoformat(),
|
||||
"end": end_time.isoformat(),
|
||||
},
|
||||
"score": score,
|
||||
"weight": weight,
|
||||
"breakdown": breakdown,
|
||||
"slo_results": slo_results,
|
||||
"health_check_results": health_check_results or {},
|
||||
}
|
||||
)
|
||||
return score
|
||||
|
||||
def finalize_report(
|
||||
self,
|
||||
*,
|
||||
prom_cli: KrknPrometheus,
|
||||
total_start_time: datetime.datetime,
|
||||
total_end_time: datetime.datetime,
|
||||
) -> None:
|
||||
if not self.scenario_reports:
|
||||
raise RuntimeError("No scenario reports added – nothing to finalize")
|
||||
|
||||
# ---------------- Weighted average (primary resiliency_score) ----------
|
||||
total_weight = sum(rep["weight"] for rep in self.scenario_reports)
|
||||
resiliency_score = int(
|
||||
sum(rep["score"] * rep["weight"] for rep in self.scenario_reports) / total_weight
|
||||
)
|
||||
|
||||
# ---------------- Overall SLO evaluation across full test window -----------------------------
|
||||
full_slo_results = evaluate_slos(
|
||||
prom_cli=prom_cli,
|
||||
slo_list=self._slos,
|
||||
start_time=total_start_time,
|
||||
end_time=total_end_time,
|
||||
)
|
||||
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
|
||||
_overall_score, full_breakdown = calculate_resiliency_score(
|
||||
slo_definitions=slo_defs,
|
||||
prometheus_results=full_slo_results,
|
||||
health_check_results={},
|
||||
)
|
||||
|
||||
self.summary = {
|
||||
"scenarios": {rep["name"]: rep["score"] for rep in self.scenario_reports},
|
||||
"resiliency_score": resiliency_score,
|
||||
"passed_slos": full_breakdown.get("passed", 0),
|
||||
"total_slos": full_breakdown.get("passed", 0) + full_breakdown.get("failed", 0),
|
||||
}
|
||||
|
||||
# Detailed report currently limited to per-scenario information; system stability section removed
|
||||
self.detailed_report = {
|
||||
"scenarios": self.scenario_reports,
|
||||
}
|
||||
|
||||
def get_summary(self) -> Dict[str, Any]:
|
||||
"""Return the concise resiliency_summary structure."""
|
||||
if not hasattr(self, "summary") or self.summary is None:
|
||||
raise RuntimeError("finalize_report() must be called first")
|
||||
return self.summary
|
||||
|
||||
def get_detailed_report(self) -> Dict[str, Any]:
|
||||
"""Return the full resiliency-report structure."""
|
||||
if not hasattr(self, "detailed_report") or self.detailed_report is None:
|
||||
raise RuntimeError("finalize_report() must be called first")
|
||||
return self.detailed_report
|
||||
|
||||
@staticmethod
|
||||
def compact_breakdown(report: Dict[str, Any]) -> Dict[str, int]:
|
||||
"""Return a compact summary dict for a single scenario report."""
|
||||
try:
|
||||
passed = report["breakdown"]["passed"]
|
||||
failed = report["breakdown"]["failed"]
|
||||
score_val = report["score"]
|
||||
except Exception:
|
||||
passed = report.get("breakdown", {}).get("passed", 0)
|
||||
failed = report.get("breakdown", {}).get("failed", 0)
|
||||
score_val = report.get("score", 0)
|
||||
return {
|
||||
"resiliency_score": score_val,
|
||||
"passed_slos": passed,
|
||||
"total_slos": passed + failed,
|
||||
}
|
||||
|
||||
def attach_compact_to_telemetry(self, chaos_telemetry: ChaosRunTelemetry) -> None:
|
||||
"""Embed per-scenario compact resiliency reports into a ChaosRunTelemetry instance."""
|
||||
score_map = {
|
||||
rep["name"]: self.compact_breakdown(rep) for rep in self.scenario_reports
|
||||
}
|
||||
new_scenarios = []
|
||||
for item in getattr(chaos_telemetry, "scenarios", []):
|
||||
if isinstance(item, dict):
|
||||
name = item.get("scenario")
|
||||
if name in score_map:
|
||||
item["resiliency_report"] = score_map[name]
|
||||
new_scenarios.append(item)
|
||||
else:
|
||||
name = getattr(item, "scenario", None)
|
||||
try:
|
||||
item_dict = dataclasses.asdict(item)
|
||||
except Exception:
|
||||
item_dict = {
|
||||
k: getattr(item, k)
|
||||
for k in dir(item)
|
||||
if not k.startswith("__") and not callable(getattr(item, k))
|
||||
}
|
||||
if name in score_map:
|
||||
item_dict["resiliency_report"] = score_map[name]
|
||||
new_scenarios.append(item_dict)
|
||||
chaos_telemetry.scenarios = new_scenarios
|
||||
|
||||
def add_scenario_reports(
|
||||
self,
|
||||
*,
|
||||
scenario_telemetries,
|
||||
prom_cli: KrknPrometheus,
|
||||
scenario_type: str,
|
||||
batch_start_dt: datetime.datetime,
|
||||
batch_end_dt: datetime.datetime,
|
||||
weight: int | float = 1,
|
||||
) -> None:
|
||||
"""Evaluate SLOs for every telemetry item belonging to a scenario window,
|
||||
store the result and enrich the telemetry list with a compact resiliency breakdown.
|
||||
|
||||
Args:
|
||||
scenario_telemetries: Iterable with telemetry objects/dicts for the
|
||||
current scenario batch window.
|
||||
prom_cli: Pre-configured :class:`KrknPrometheus` instance.
|
||||
scenario_type: Fallback scenario identifier in case individual
|
||||
telemetry items do not provide one.
|
||||
batch_start_dt: Fallback start timestamp for the batch window.
|
||||
batch_end_dt: Fallback end timestamp for the batch window.
|
||||
weight: Weight to assign to every scenario when calculating the final
|
||||
weighted average.
|
||||
logger: Optional custom logger.
|
||||
"""
|
||||
|
||||
for tel in scenario_telemetries:
|
||||
try:
|
||||
# -------- Extract timestamps & scenario name --------------------
|
||||
if isinstance(tel, dict):
|
||||
st_ts = tel.get("start_timestamp")
|
||||
en_ts = tel.get("end_timestamp")
|
||||
scen_name = tel.get("scenario", scenario_type)
|
||||
else:
|
||||
st_ts = getattr(tel, "start_timestamp", None)
|
||||
en_ts = getattr(tel, "end_timestamp", None)
|
||||
scen_name = getattr(tel, "scenario", scenario_type)
|
||||
|
||||
if st_ts and en_ts:
|
||||
st_dt = datetime.datetime.fromtimestamp(int(st_ts))
|
||||
en_dt = datetime.datetime.fromtimestamp(int(en_ts))
|
||||
else:
|
||||
st_dt = batch_start_dt
|
||||
en_dt = batch_end_dt
|
||||
|
||||
# -------- Calculate resiliency score for the scenario -----------
|
||||
self.add_scenario_report(
|
||||
scenario_name=str(scen_name),
|
||||
prom_cli=prom_cli,
|
||||
start_time=st_dt,
|
||||
end_time=en_dt,
|
||||
weight=weight,
|
||||
health_check_results=None,
|
||||
)
|
||||
|
||||
compact = self.compact_breakdown(self.scenario_reports[-1])
|
||||
if isinstance(tel, dict):
|
||||
tel["resiliency_report"] = compact
|
||||
else:
|
||||
setattr(tel, "resiliency_report", compact)
|
||||
except Exception as exc:
|
||||
logging.error("Resiliency per-scenario evaluation failed: %s", exc)
|
||||
|
||||
def finalize_and_save(
|
||||
self,
|
||||
*,
|
||||
prom_cli: KrknPrometheus,
|
||||
total_start_time: datetime.datetime,
|
||||
total_end_time: datetime.datetime,
|
||||
run_mode: str = "standalone",
|
||||
detailed_path: str = "resiliency-report.json",
|
||||
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
||||
"""Finalize resiliency scoring, persist reports and return them.
|
||||
|
||||
Args:
|
||||
prom_cli: Pre-configured KrknPrometheus instance.
|
||||
total_start_time: Start time for the full test window.
|
||||
total_end_time: End time for the full test window.
|
||||
run_mode: "controller" or "standalone" mode.
|
||||
|
||||
Returns:
|
||||
(detailed_report)
|
||||
"""
|
||||
|
||||
try:
|
||||
self.finalize_report(
|
||||
prom_cli=prom_cli,
|
||||
total_start_time=total_start_time,
|
||||
total_end_time=total_end_time,
|
||||
)
|
||||
detailed = self.get_detailed_report()
|
||||
|
||||
if run_mode == "controller":
|
||||
# krknctl expects the detailed report on stdout in a special format
|
||||
try:
|
||||
detailed_json = json.dumps(detailed)
|
||||
print(f"KRKN_RESILIENCY_REPORT_JSON:{detailed_json}")
|
||||
logging.info("Resiliency report logged to stdout for krknctl.")
|
||||
except Exception as exc:
|
||||
logging.error("Failed to serialize and log detailed resiliency report: %s", exc)
|
||||
else:
|
||||
# Stand-alone mode – write to files for post-run consumption
|
||||
try:
|
||||
with open(detailed_path, "w", encoding="utf-8") as fp:
|
||||
json.dump(detailed, fp, indent=2)
|
||||
logging.info("Resiliency report written: %s", detailed_path)
|
||||
except Exception as io_exc:
|
||||
logging.error("Failed to write resiliency report files: %s", io_exc)
|
||||
|
||||
except Exception as exc:
|
||||
logging.error("Failed to finalize resiliency scoring: %s", exc)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
@staticmethod
|
||||
def _normalise_alerts(raw_alerts: Any) -> List[Dict[str, Any]]:
|
||||
"""Convert raw YAML alerts data into internal SLO list structure."""
|
||||
if not isinstance(raw_alerts, list):
|
||||
raise ValueError("SLO configuration must be a list under key 'slos' or top-level list")
|
||||
|
||||
slos: List[Dict[str, Any]] = []
|
||||
for idx, alert in enumerate(raw_alerts):
|
||||
if not (isinstance(alert, dict) and "expr" in alert and "severity" in alert):
|
||||
logging.warning("Skipping invalid alert entry at index %d: %s", idx, alert)
|
||||
continue
|
||||
name = alert.get("description") or f"slo_{idx}"
|
||||
slos.append(
|
||||
{
|
||||
"name": name,
|
||||
"expr": alert["expr"],
|
||||
"severity": str(alert["severity"]).lower(),
|
||||
"weight": alert.get("weight")
|
||||
}
|
||||
)
|
||||
return slos
|
||||
76
krkn/resiliency/score.py
Normal file
76
krkn/resiliency/score.py
Normal file
@@ -0,0 +1,76 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
DEFAULT_WEIGHTS = {"critical": 3, "warning": 1}
|
||||
|
||||
|
||||
class SLOResult:
|
||||
"""Simple container representing evaluation outcome for a single SLO."""
|
||||
|
||||
def __init__(self, name: str, severity: str, passed: bool, weight: int | None = None):
|
||||
self.name = name
|
||||
self.severity = severity
|
||||
self.passed = passed
|
||||
self._custom_weight = weight
|
||||
|
||||
def weight(self, severity_weights: Dict[str, int]) -> int:
|
||||
"""Return the weight for this SLO. Uses custom weight if set, otherwise uses severity-based weight."""
|
||||
if self._custom_weight is not None:
|
||||
return self._custom_weight
|
||||
return severity_weights.get(self.severity, severity_weights.get("warning", 1))
|
||||
|
||||
|
||||
def calculate_resiliency_score(
|
||||
slo_definitions: Dict[str, str] | Dict[str, Dict[str, int | str | None]],
|
||||
prometheus_results: Dict[str, bool],
|
||||
health_check_results: Dict[str, bool],
|
||||
) -> Tuple[int, Dict[str, int]]:
|
||||
"""Compute a resiliency score between 0-100 based on SLO pass/fail results.
|
||||
|
||||
Args:
|
||||
slo_definitions: Mapping of SLO name -> severity ("critical" | "warning") OR
|
||||
SLO name -> {"severity": str, "weight": int | None}.
|
||||
prometheus_results: Mapping of SLO name -> bool indicating whether the SLO
|
||||
passed. Any SLO missing in this mapping is treated as failed.
|
||||
health_check_results: Mapping of custom health-check name -> bool pass flag.
|
||||
These checks are always treated as *critical*.
|
||||
|
||||
Returns:
|
||||
Tuple containing (final_score, breakdown) where *breakdown* is a dict with
|
||||
the counts of passed/failed SLOs per severity.
|
||||
"""
|
||||
|
||||
slo_objects: List[SLOResult] = []
|
||||
for slo_name, slo_def in slo_definitions.items():
|
||||
# Exclude SLOs that were not evaluated (query returned no data)
|
||||
if slo_name not in prometheus_results:
|
||||
continue
|
||||
passed = bool(prometheus_results[slo_name])
|
||||
|
||||
# Support both old format (str) and new format (dict)
|
||||
if isinstance(slo_def, str):
|
||||
severity = slo_def
|
||||
slo_weight = None
|
||||
else:
|
||||
severity = slo_def.get("severity", "warning")
|
||||
slo_weight = slo_def.get("weight")
|
||||
|
||||
slo_objects.append(SLOResult(slo_name, severity, passed, weight=slo_weight))
|
||||
|
||||
# Health-check SLOs (by default keeping them critical)
|
||||
for hc_name, hc_passed in health_check_results.items():
|
||||
slo_objects.append(SLOResult(hc_name, "critical", bool(hc_passed)))
|
||||
|
||||
total_points = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects)
|
||||
points_lost = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects if not slo.passed)
|
||||
|
||||
score = 0 if total_points == 0 else int(((total_points - points_lost) / total_points) * 100)
|
||||
|
||||
breakdown = {
|
||||
"total_points": total_points,
|
||||
"points_lost": points_lost,
|
||||
"passed": len([s for s in slo_objects if s.passed]),
|
||||
"failed": len([s for s in slo_objects if not s.passed]),
|
||||
}
|
||||
return score, breakdown
|
||||
@@ -12,7 +12,7 @@ import uuid
|
||||
import time
|
||||
import queue
|
||||
import threading
|
||||
from typing import Optional
|
||||
from typing import Optional, Dict
|
||||
|
||||
from krkn import cerberus
|
||||
from krkn_lib.elastic.krkn_elastic import KrknElastic
|
||||
@@ -21,11 +21,15 @@ from krkn_lib.models.krkn import ChaosRunOutput, ChaosRunAlertSummary
|
||||
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
|
||||
import krkn.prometheus as prometheus_plugin
|
||||
import server as server
|
||||
from krkn.resiliency.resiliency import (
|
||||
Resiliency
|
||||
)
|
||||
from krkn_lib.k8s import KrknKubernetes
|
||||
from krkn_lib.ocp import KrknOpenshift
|
||||
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
from krkn_lib.models.telemetry import ChaosRunTelemetry
|
||||
from krkn_lib.models.k8s import ResiliencyReport
|
||||
from krkn_lib.utils import SafeLogger
|
||||
from krkn_lib.utils.functions import get_yaml_item_value, get_junit_test_case
|
||||
|
||||
@@ -54,6 +58,8 @@ def main(options, command: Optional[str]) -> int:
|
||||
print(pyfiglet.figlet_format("kraken"))
|
||||
logging.info("Starting kraken")
|
||||
|
||||
|
||||
|
||||
cfg = options.cfg
|
||||
# Parse and read the config
|
||||
if os.path.isfile(cfg):
|
||||
@@ -65,6 +71,7 @@ def main(options, command: Optional[str]) -> int:
|
||||
get_yaml_item_value(config["kraken"], "kubeconfig_path", "")
|
||||
)
|
||||
kraken_config = cfg
|
||||
|
||||
chaos_scenarios = get_yaml_item_value(config["kraken"], "chaos_scenarios", [])
|
||||
publish_running_status = get_yaml_item_value(
|
||||
config["kraken"], "publish_kraken_status", False
|
||||
@@ -86,14 +93,20 @@ def main(options, command: Optional[str]) -> int:
|
||||
config["kraken"], "signal_address", "0.0.0.0"
|
||||
)
|
||||
run_signal = get_yaml_item_value(config["kraken"], "signal_state", "RUN")
|
||||
|
||||
resiliency_config = get_yaml_item_value(config,"resiliency",{})
|
||||
# Determine execution mode (standalone, controller, or disabled)
|
||||
run_mode = get_yaml_item_value(resiliency_config, "resiliency_run_mode", "standalone")
|
||||
valid_run_modes = {"standalone", "detailed", "disabled"}
|
||||
if run_mode not in valid_run_modes:
|
||||
logging.warning("Unknown resiliency_run_mode '%s'. Defaulting to 'standalone'", run_mode)
|
||||
run_mode = "standalone"
|
||||
wait_duration = get_yaml_item_value(config["tunings"], "wait_duration", 60)
|
||||
iterations = get_yaml_item_value(config["tunings"], "iterations", 1)
|
||||
daemon_mode = get_yaml_item_value(config["tunings"], "daemon_mode", False)
|
||||
|
||||
prometheus_url = config["performance_monitoring"].get("prometheus_url")
|
||||
prometheus_bearer_token = config["performance_monitoring"].get(
|
||||
"prometheus_bearer_token"
|
||||
)
|
||||
prometheus_bearer_token = config["performance_monitoring"].get("prometheus_bearer_token")
|
||||
run_uuid = config["performance_monitoring"].get("uuid")
|
||||
enable_alerts = get_yaml_item_value(
|
||||
config["performance_monitoring"], "enable_alerts", False
|
||||
@@ -101,6 +114,10 @@ def main(options, command: Optional[str]) -> int:
|
||||
enable_metrics = get_yaml_item_value(
|
||||
config["performance_monitoring"], "enable_metrics", False
|
||||
)
|
||||
|
||||
|
||||
# Default placeholder; will be overridden if a Prometheus URL is available
|
||||
prometheus = None
|
||||
# elastic search
|
||||
enable_elastic = get_yaml_item_value(config["elastic"], "enable_elastic", False)
|
||||
elastic_run_tag = get_yaml_item_value(config["elastic"], "run_tag", "")
|
||||
@@ -231,6 +248,11 @@ def main(options, command: Optional[str]) -> int:
|
||||
else:
|
||||
logging.info("Cluster version CRD not detected, skipping")
|
||||
|
||||
# Final check: ensure Prometheus URL is available; disable resiliency if not
|
||||
if (not prometheus_url or prometheus_url.strip() == "") and run_mode != "disabled":
|
||||
logging.warning("Prometheus URL not provided; disabling resiliency score features.")
|
||||
run_mode = "disabled"
|
||||
|
||||
# KrknTelemetry init
|
||||
telemetry_k8s = KrknTelemetryKubernetes(
|
||||
safe_logger, kubecli, config["telemetry"]
|
||||
@@ -251,9 +273,18 @@ def main(options, command: Optional[str]) -> int:
|
||||
else:
|
||||
elastic_search = None
|
||||
summary = ChaosRunAlertSummary()
|
||||
if enable_metrics or enable_alerts or check_critical_alerts:
|
||||
if enable_metrics or enable_alerts or check_critical_alerts or run_mode != "disabled":
|
||||
prometheus = KrknPrometheus(prometheus_url, prometheus_bearer_token)
|
||||
|
||||
# Quick connectivity probe for Prometheus – disable resiliency if unreachable
|
||||
try:
|
||||
prometheus.process_prom_query_in_range(
|
||||
"up", datetime.datetime.utcnow() - datetime.timedelta(seconds=60), datetime.datetime.utcnow(), granularity=60
|
||||
)
|
||||
except Exception as prom_exc:
|
||||
logging.error("Prometheus connectivity test failed: %s. Disabling resiliency features as Prometheus is required for SLO evaluation.", prom_exc)
|
||||
run_mode = "disabled"
|
||||
resiliency_alerts = get_yaml_item_value(resiliency_config, "resiliency_file", get_yaml_item_value(config['performance_monitoring'],"alert_profile", "config/alerts.yaml"))
|
||||
resiliency_obj = Resiliency(resiliency_alerts) if run_mode != "disabled" else None # Initialize resiliency orchestrator
|
||||
logging.info("Server URL: %s" % kubecli.get_host())
|
||||
|
||||
if command == "list-rollback":
|
||||
@@ -369,6 +400,8 @@ def main(options, command: Optional[str]) -> int:
|
||||
)
|
||||
sys.exit(-1)
|
||||
|
||||
|
||||
batch_window_start_dt = datetime.datetime.utcnow()
|
||||
failed_scenarios_current, scenario_telemetries = (
|
||||
scenario_plugin.run_scenarios(
|
||||
run_uuid, scenarios_list, config, telemetry_ocp
|
||||
@@ -376,6 +409,15 @@ def main(options, command: Optional[str]) -> int:
|
||||
)
|
||||
failed_post_scenarios.extend(failed_scenarios_current)
|
||||
chaos_telemetry.scenarios.extend(scenario_telemetries)
|
||||
batch_window_end_dt = datetime.datetime.utcnow()
|
||||
if resiliency_obj:
|
||||
resiliency_obj.add_scenario_reports(
|
||||
scenario_telemetries=scenario_telemetries,
|
||||
prom_cli=prometheus,
|
||||
scenario_type=scenario_type,
|
||||
batch_start_dt=batch_window_start_dt,
|
||||
batch_end_dt=batch_window_end_dt,
|
||||
)
|
||||
|
||||
post_critical_alerts = 0
|
||||
if check_critical_alerts:
|
||||
@@ -440,12 +482,41 @@ def main(options, command: Optional[str]) -> int:
|
||||
else:
|
||||
logging.info("No error logs collected during chaos run")
|
||||
chaos_telemetry.error_logs = []
|
||||
if resiliency_obj:
|
||||
try:
|
||||
resiliency_obj.attach_compact_to_telemetry(chaos_telemetry)
|
||||
except Exception as exc:
|
||||
logging.error("Failed to embed per-scenario resiliency in telemetry: %s", exc)
|
||||
|
||||
if resiliency_obj:
|
||||
try:
|
||||
resiliency_obj.finalize_and_save(
|
||||
prom_cli=prometheus,
|
||||
total_start_time=datetime.datetime.fromtimestamp(start_time),
|
||||
total_end_time=datetime.datetime.fromtimestamp(end_time),
|
||||
run_mode=run_mode,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logging.error("Failed to finalize resiliency scoring: %s", e)
|
||||
|
||||
|
||||
telemetry_json = chaos_telemetry.to_json()
|
||||
decoded_chaos_run_telemetry = ChaosRunTelemetry(json.loads(telemetry_json))
|
||||
if resiliency_obj and hasattr(resiliency_obj, "summary") and resiliency_obj.summary is not None:
|
||||
summary_dict = resiliency_obj.get_summary()
|
||||
decoded_chaos_run_telemetry.overall_resiliency_report = ResiliencyReport(
|
||||
json_object=summary_dict,
|
||||
resiliency_score=summary_dict.get("resiliency_score", 0),
|
||||
passed_slos=summary_dict.get("passed_slos", 0),
|
||||
total_slos=summary_dict.get("total_slos", 0)
|
||||
)
|
||||
chaos_output.telemetry = decoded_chaos_run_telemetry
|
||||
logging.info(f"Chaos data:\n{chaos_output.to_json()}")
|
||||
if enable_elastic:
|
||||
elastic_telemetry = ElasticChaosRunTelemetry(
|
||||
chaos_run_telemetry=decoded_chaos_run_telemetry
|
||||
)
|
||||
result = elastic_search.push_telemetry(
|
||||
decoded_chaos_run_telemetry, elastic_telemetry_index
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user