Compare commits

...

5 Commits

Author SHA1 Message Date
Paige Patton
b9d7c8ba12 if no scenarios run 2026-03-11 13:58:50 -04:00
Paige Patton
e8075743ab adding resiliency config default 2026-03-11 13:58:50 -04:00
Paige Patton
ec5511b2db custom weight 2026-03-11 13:58:50 -04:00
Abhinav Sharma
4e7dca9474 fix: check prometheus url after openshift prometheus check
Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
2026-03-11 13:58:49 -04:00
Abhinav Sharma
edf0f3d1c9 feat(resiliency): implement comprehensive resiliency scoring system
- Added resiliency scoring engine
- Implemented scenario-wise scoring with telemetry
- Added configurable SLOs and detailed reporting

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
2026-03-11 13:58:34 -04:00
7 changed files with 605 additions and 7 deletions

1
.gitignore vendored
View File

@@ -17,6 +17,7 @@ __pycache__/*
kube-burner*
kube_burner*
recommender_*.json
resiliency*.json
# Project files
.ropeproject

View File

@@ -131,4 +131,5 @@ kubevirt_checks: # Utilizing virt che
disconnected: False # Boolean of how to try to connect to the VMIs; if True will use the ip_address to try ssh from within a node, if false will use the name and uses virtctl to try to connect; Default is False
ssh_node: "" # If set, will be a backup way to ssh to a node. Will want to set to a node that isn't targeted in chaos
node_names: ""
exit_on_failure: # If value is True and VMI's are failing post chaos returns failure, values can be True/False
exit_on_failure: # If value is True and VMI's are failing post chaos returns failure, values can be True/False

View File

@@ -0,0 +1,79 @@
from __future__ import annotations
import datetime
import logging
from typing import Dict, Any, List, Optional
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
# -----------------------------------------------------------------------------
# SLO evaluation helpers (used by krkn.resiliency)
# -----------------------------------------------------------------------------
def slo_passed(prometheus_result: List[Any]) -> Optional[bool]:
if not prometheus_result:
return None
has_samples = False
for series in prometheus_result:
if "values" in series:
has_samples = True
for _ts, val in series["values"]:
try:
if float(val) > 0:
return False
except (TypeError, ValueError):
continue
elif "value" in series:
has_samples = True
try:
return float(series["value"][1]) == 0
except (TypeError, ValueError):
return False
# If we reached here and never saw any samples, skip
return None if not has_samples else True
def evaluate_slos(
prom_cli: KrknPrometheus,
slo_list: List[Dict[str, Any]],
start_time: datetime.datetime,
end_time: datetime.datetime,
) -> Dict[str, bool]:
"""Evaluate a list of SLO expressions against Prometheus.
Args:
prom_cli: Configured Prometheus client.
slo_list: List of dicts with keys ``name``, ``expr``.
start_time: Start timestamp.
end_time: End timestamp.
granularity: Step in seconds for range queries.
Returns:
Mapping name -> bool indicating pass status.
True means good we passed the SLO test otherwise failed the SLO
"""
results: Dict[str, bool] = {}
logging.info("Evaluating %d SLOs over window %s %s", len(slo_list), start_time, end_time)
for slo in slo_list:
expr = slo["expr"]
name = slo["name"]
try:
response = prom_cli.process_prom_query_in_range(
expr,
start_time=start_time,
end_time=end_time,
)
passed = slo_passed(response)
if passed is None:
# Absence of data indicates the condition did not trigger; treat as pass.
logging.debug("SLO '%s' query returned no data; assuming pass.", name)
results[name] = True
else:
results[name] = passed
except Exception as exc:
logging.error("PromQL query failed for SLO '%s': %s", name, exc)
results[name] = False
return results

View File

@@ -0,0 +1,4 @@
"""krkn.resiliency package public interface."""
from .resiliency import Resiliency # noqa: F401
from .score import calculate_resiliency_score # noqa: F401

View File

@@ -0,0 +1,366 @@
"""Resiliency evaluation orchestrator for Krkn chaos runs.
This module provides the `Resiliency` class which loads the canonical
`alerts.yaml`, executes every SLO expression against Prometheus in the
chaos-test time window, determines pass/fail status and calculates an
overall resiliency score using the generic weighted model implemented
in `krkn.resiliency.score`.
"""
from __future__ import annotations
import datetime
import logging
import os
from typing import Dict, List, Any, Optional, Tuple
import yaml
import json
import dataclasses
from krkn_lib.models.telemetry import ChaosRunTelemetry
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
from krkn.prometheus.collector import evaluate_slos
from krkn.resiliency.score import calculate_resiliency_score
class Resiliency:
"""Central orchestrator for resiliency scoring."""
def __init__(self, alerts_yaml_path: str):
if not os.path.exists(alerts_yaml_path):
raise FileNotFoundError(f"alerts file not found: {alerts_yaml_path}")
with open(alerts_yaml_path, "r", encoding="utf-8") as fp:
raw_yaml_data = yaml.safe_load(fp)
logging.info("Loaded SLO configuration from %s", alerts_yaml_path)
self._slos = self._normalise_alerts(raw_yaml_data)
self._results: Dict[str, bool] = {}
self._score: Optional[int] = None
self._breakdown: Optional[Dict[str, int]] = None
self._health_check_results: Dict[str, bool] = {}
self.scenario_reports: List[Dict[str, Any]] = []
self.summary: Optional[Dict[str, Any]] = None
self.detailed_report: Optional[Dict[str, Any]] = None
# ---------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------
def calculate_score(
self,
*,
health_check_results: Optional[Dict[str, bool]] = None,
) -> int:
"""Calculate the resiliency score using collected SLO results."""
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
score, breakdown = calculate_resiliency_score(
slo_definitions=slo_defs,
prometheus_results=self._results,
health_check_results=health_check_results or {},
)
self._score = score
self._breakdown = breakdown
self._health_check_results = health_check_results or {}
return score
def to_dict(self) -> Dict[str, Any]:
"""Return a dictionary ready for telemetry output."""
if self._score is None:
raise RuntimeError("calculate_score() must be called before to_dict()")
return {
"score": self._score,
"breakdown": self._breakdown,
"slo_results": self._results,
"health_check_results": getattr(self, "_health_check_results", {}),
}
# ------------------------------------------------------------------
# Scenario-based resiliency evaluation
# ------------------------------------------------------------------
def add_scenario_report(
self,
*,
scenario_name: str,
prom_cli: KrknPrometheus,
start_time: datetime.datetime,
end_time: datetime.datetime,
weight: float | int = 1,
health_check_results: Optional[Dict[str, bool]] = None,
) -> int:
"""
Evaluate SLOs for a single scenario window and store the result.
Args:
scenario_name: Human-friendly scenario identifier.
prom_cli: Initialized KrknPrometheus instance.
start_time: Window start.
end_time: Window end.
weight: Weight to use for the final weighted average calculation.
health_check_results: Optional mapping of custom health-check name ➡ bool.
Returns:
The calculated integer resiliency score (0-100) for this scenario.
"""
slo_results = evaluate_slos(
prom_cli=prom_cli,
slo_list=self._slos,
start_time=start_time,
end_time=end_time,
)
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
score, breakdown = calculate_resiliency_score(
slo_definitions=slo_defs,
prometheus_results=slo_results,
health_check_results=health_check_results or {},
)
self.scenario_reports.append(
{
"name": scenario_name,
"window": {
"start": start_time.isoformat(),
"end": end_time.isoformat(),
},
"score": score,
"weight": weight,
"breakdown": breakdown,
"slo_results": slo_results,
"health_check_results": health_check_results or {},
}
)
return score
def finalize_report(
self,
*,
prom_cli: KrknPrometheus,
total_start_time: datetime.datetime,
total_end_time: datetime.datetime,
) -> None:
if not self.scenario_reports:
raise RuntimeError("No scenario reports added nothing to finalize")
# ---------------- Weighted average (primary resiliency_score) ----------
total_weight = sum(rep["weight"] for rep in self.scenario_reports)
resiliency_score = int(
sum(rep["score"] * rep["weight"] for rep in self.scenario_reports) / total_weight
)
# ---------------- Overall SLO evaluation across full test window -----------------------------
full_slo_results = evaluate_slos(
prom_cli=prom_cli,
slo_list=self._slos,
start_time=total_start_time,
end_time=total_end_time,
)
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
_overall_score, full_breakdown = calculate_resiliency_score(
slo_definitions=slo_defs,
prometheus_results=full_slo_results,
health_check_results={},
)
self.summary = {
"scenarios": {rep["name"]: rep["score"] for rep in self.scenario_reports},
"resiliency_score": resiliency_score,
"passed_slos": full_breakdown.get("passed", 0),
"total_slos": full_breakdown.get("passed", 0) + full_breakdown.get("failed", 0),
}
# Detailed report currently limited to per-scenario information; system stability section removed
self.detailed_report = {
"scenarios": self.scenario_reports,
}
def get_summary(self) -> Dict[str, Any]:
"""Return the concise resiliency_summary structure."""
if not hasattr(self, "summary") or self.summary is None:
raise RuntimeError("finalize_report() must be called first")
return self.summary
def get_detailed_report(self) -> Dict[str, Any]:
"""Return the full resiliency-report structure."""
if not hasattr(self, "detailed_report") or self.detailed_report is None:
raise RuntimeError("finalize_report() must be called first")
return self.detailed_report
@staticmethod
def compact_breakdown(report: Dict[str, Any]) -> Dict[str, int]:
"""Return a compact summary dict for a single scenario report."""
try:
passed = report["breakdown"]["passed"]
failed = report["breakdown"]["failed"]
score_val = report["score"]
except Exception:
passed = report.get("breakdown", {}).get("passed", 0)
failed = report.get("breakdown", {}).get("failed", 0)
score_val = report.get("score", 0)
return {
"resiliency_score": score_val,
"passed_slos": passed,
"total_slos": passed + failed,
}
def attach_compact_to_telemetry(self, chaos_telemetry: ChaosRunTelemetry) -> None:
"""Embed per-scenario compact resiliency reports into a ChaosRunTelemetry instance."""
score_map = {
rep["name"]: self.compact_breakdown(rep) for rep in self.scenario_reports
}
new_scenarios = []
for item in getattr(chaos_telemetry, "scenarios", []):
if isinstance(item, dict):
name = item.get("scenario")
if name in score_map:
item["resiliency_report"] = score_map[name]
new_scenarios.append(item)
else:
name = getattr(item, "scenario", None)
try:
item_dict = dataclasses.asdict(item)
except Exception:
item_dict = {
k: getattr(item, k)
for k in dir(item)
if not k.startswith("__") and not callable(getattr(item, k))
}
if name in score_map:
item_dict["resiliency_report"] = score_map[name]
new_scenarios.append(item_dict)
chaos_telemetry.scenarios = new_scenarios
def add_scenario_reports(
self,
*,
scenario_telemetries,
prom_cli: KrknPrometheus,
scenario_type: str,
batch_start_dt: datetime.datetime,
batch_end_dt: datetime.datetime,
weight: int | float = 1,
) -> None:
"""Evaluate SLOs for every telemetry item belonging to a scenario window,
store the result and enrich the telemetry list with a compact resiliency breakdown.
Args:
scenario_telemetries: Iterable with telemetry objects/dicts for the
current scenario batch window.
prom_cli: Pre-configured :class:`KrknPrometheus` instance.
scenario_type: Fallback scenario identifier in case individual
telemetry items do not provide one.
batch_start_dt: Fallback start timestamp for the batch window.
batch_end_dt: Fallback end timestamp for the batch window.
weight: Weight to assign to every scenario when calculating the final
weighted average.
logger: Optional custom logger.
"""
for tel in scenario_telemetries:
try:
# -------- Extract timestamps & scenario name --------------------
if isinstance(tel, dict):
st_ts = tel.get("start_timestamp")
en_ts = tel.get("end_timestamp")
scen_name = tel.get("scenario", scenario_type)
else:
st_ts = getattr(tel, "start_timestamp", None)
en_ts = getattr(tel, "end_timestamp", None)
scen_name = getattr(tel, "scenario", scenario_type)
if st_ts and en_ts:
st_dt = datetime.datetime.fromtimestamp(int(st_ts))
en_dt = datetime.datetime.fromtimestamp(int(en_ts))
else:
st_dt = batch_start_dt
en_dt = batch_end_dt
# -------- Calculate resiliency score for the scenario -----------
self.add_scenario_report(
scenario_name=str(scen_name),
prom_cli=prom_cli,
start_time=st_dt,
end_time=en_dt,
weight=weight,
health_check_results=None,
)
compact = self.compact_breakdown(self.scenario_reports[-1])
if isinstance(tel, dict):
tel["resiliency_report"] = compact
else:
setattr(tel, "resiliency_report", compact)
except Exception as exc:
logging.error("Resiliency per-scenario evaluation failed: %s", exc)
def finalize_and_save(
self,
*,
prom_cli: KrknPrometheus,
total_start_time: datetime.datetime,
total_end_time: datetime.datetime,
run_mode: str = "standalone",
detailed_path: str = "resiliency-report.json",
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Finalize resiliency scoring, persist reports and return them.
Args:
prom_cli: Pre-configured KrknPrometheus instance.
total_start_time: Start time for the full test window.
total_end_time: End time for the full test window.
run_mode: "controller" or "standalone" mode.
Returns:
(detailed_report)
"""
try:
self.finalize_report(
prom_cli=prom_cli,
total_start_time=total_start_time,
total_end_time=total_end_time,
)
detailed = self.get_detailed_report()
if run_mode == "controller":
# krknctl expects the detailed report on stdout in a special format
try:
detailed_json = json.dumps(detailed)
print(f"KRKN_RESILIENCY_REPORT_JSON:{detailed_json}")
logging.info("Resiliency report logged to stdout for krknctl.")
except Exception as exc:
logging.error("Failed to serialize and log detailed resiliency report: %s", exc)
else:
# Stand-alone mode write to files for post-run consumption
try:
with open(detailed_path, "w", encoding="utf-8") as fp:
json.dump(detailed, fp, indent=2)
logging.info("Resiliency report written: %s", detailed_path)
except Exception as io_exc:
logging.error("Failed to write resiliency report files: %s", io_exc)
except Exception as exc:
logging.error("Failed to finalize resiliency scoring: %s", exc)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _normalise_alerts(raw_alerts: Any) -> List[Dict[str, Any]]:
"""Convert raw YAML alerts data into internal SLO list structure."""
if not isinstance(raw_alerts, list):
raise ValueError("SLO configuration must be a list under key 'slos' or top-level list")
slos: List[Dict[str, Any]] = []
for idx, alert in enumerate(raw_alerts):
if not (isinstance(alert, dict) and "expr" in alert and "severity" in alert):
logging.warning("Skipping invalid alert entry at index %d: %s", idx, alert)
continue
name = alert.get("description") or f"slo_{idx}"
slos.append(
{
"name": name,
"expr": alert["expr"],
"severity": str(alert["severity"]).lower(),
"weight": alert.get("weight")
}
)
return slos

76
krkn/resiliency/score.py Normal file
View File

@@ -0,0 +1,76 @@
from __future__ import annotations
from typing import Dict, List, Tuple
DEFAULT_WEIGHTS = {"critical": 3, "warning": 1}
class SLOResult:
"""Simple container representing evaluation outcome for a single SLO."""
def __init__(self, name: str, severity: str, passed: bool, weight: int | None = None):
self.name = name
self.severity = severity
self.passed = passed
self._custom_weight = weight
def weight(self, severity_weights: Dict[str, int]) -> int:
"""Return the weight for this SLO. Uses custom weight if set, otherwise uses severity-based weight."""
if self._custom_weight is not None:
return self._custom_weight
return severity_weights.get(self.severity, severity_weights.get("warning", 1))
def calculate_resiliency_score(
slo_definitions: Dict[str, str] | Dict[str, Dict[str, int | str | None]],
prometheus_results: Dict[str, bool],
health_check_results: Dict[str, bool],
) -> Tuple[int, Dict[str, int]]:
"""Compute a resiliency score between 0-100 based on SLO pass/fail results.
Args:
slo_definitions: Mapping of SLO name -> severity ("critical" | "warning") OR
SLO name -> {"severity": str, "weight": int | None}.
prometheus_results: Mapping of SLO name -> bool indicating whether the SLO
passed. Any SLO missing in this mapping is treated as failed.
health_check_results: Mapping of custom health-check name -> bool pass flag.
These checks are always treated as *critical*.
Returns:
Tuple containing (final_score, breakdown) where *breakdown* is a dict with
the counts of passed/failed SLOs per severity.
"""
slo_objects: List[SLOResult] = []
for slo_name, slo_def in slo_definitions.items():
# Exclude SLOs that were not evaluated (query returned no data)
if slo_name not in prometheus_results:
continue
passed = bool(prometheus_results[slo_name])
# Support both old format (str) and new format (dict)
if isinstance(slo_def, str):
severity = slo_def
slo_weight = None
else:
severity = slo_def.get("severity", "warning")
slo_weight = slo_def.get("weight")
slo_objects.append(SLOResult(slo_name, severity, passed, weight=slo_weight))
# Health-check SLOs (by default keeping them critical)
for hc_name, hc_passed in health_check_results.items():
slo_objects.append(SLOResult(hc_name, "critical", bool(hc_passed)))
total_points = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects)
points_lost = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects if not slo.passed)
score = 0 if total_points == 0 else int(((total_points - points_lost) / total_points) * 100)
breakdown = {
"total_points": total_points,
"points_lost": points_lost,
"passed": len([s for s in slo_objects if s.passed]),
"failed": len([s for s in slo_objects if not s.passed]),
}
return score, breakdown

View File

@@ -12,7 +12,7 @@ import uuid
import time
import queue
import threading
from typing import Optional
from typing import Optional, Dict
from krkn import cerberus
from krkn_lib.elastic.krkn_elastic import KrknElastic
@@ -21,11 +21,15 @@ from krkn_lib.models.krkn import ChaosRunOutput, ChaosRunAlertSummary
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
import krkn.prometheus as prometheus_plugin
import server as server
from krkn.resiliency.resiliency import (
Resiliency
)
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.ocp import KrknOpenshift
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.models.telemetry import ChaosRunTelemetry
from krkn_lib.models.k8s import ResiliencyReport
from krkn_lib.utils import SafeLogger
from krkn_lib.utils.functions import get_yaml_item_value, get_junit_test_case
@@ -54,6 +58,8 @@ def main(options, command: Optional[str]) -> int:
print(pyfiglet.figlet_format("kraken"))
logging.info("Starting kraken")
cfg = options.cfg
# Parse and read the config
if os.path.isfile(cfg):
@@ -65,6 +71,7 @@ def main(options, command: Optional[str]) -> int:
get_yaml_item_value(config["kraken"], "kubeconfig_path", "")
)
kraken_config = cfg
chaos_scenarios = get_yaml_item_value(config["kraken"], "chaos_scenarios", [])
publish_running_status = get_yaml_item_value(
config["kraken"], "publish_kraken_status", False
@@ -86,14 +93,20 @@ def main(options, command: Optional[str]) -> int:
config["kraken"], "signal_address", "0.0.0.0"
)
run_signal = get_yaml_item_value(config["kraken"], "signal_state", "RUN")
resiliency_config = get_yaml_item_value(config,"resiliency",{})
# Determine execution mode (standalone, controller, or disabled)
run_mode = get_yaml_item_value(resiliency_config, "resiliency_run_mode", "standalone")
valid_run_modes = {"standalone", "detailed", "disabled"}
if run_mode not in valid_run_modes:
logging.warning("Unknown resiliency_run_mode '%s'. Defaulting to 'standalone'", run_mode)
run_mode = "standalone"
wait_duration = get_yaml_item_value(config["tunings"], "wait_duration", 60)
iterations = get_yaml_item_value(config["tunings"], "iterations", 1)
daemon_mode = get_yaml_item_value(config["tunings"], "daemon_mode", False)
prometheus_url = config["performance_monitoring"].get("prometheus_url")
prometheus_bearer_token = config["performance_monitoring"].get(
"prometheus_bearer_token"
)
prometheus_bearer_token = config["performance_monitoring"].get("prometheus_bearer_token")
run_uuid = config["performance_monitoring"].get("uuid")
enable_alerts = get_yaml_item_value(
config["performance_monitoring"], "enable_alerts", False
@@ -101,6 +114,10 @@ def main(options, command: Optional[str]) -> int:
enable_metrics = get_yaml_item_value(
config["performance_monitoring"], "enable_metrics", False
)
# Default placeholder; will be overridden if a Prometheus URL is available
prometheus = None
# elastic search
enable_elastic = get_yaml_item_value(config["elastic"], "enable_elastic", False)
elastic_run_tag = get_yaml_item_value(config["elastic"], "run_tag", "")
@@ -231,6 +248,11 @@ def main(options, command: Optional[str]) -> int:
else:
logging.info("Cluster version CRD not detected, skipping")
# Final check: ensure Prometheus URL is available; disable resiliency if not
if (not prometheus_url or prometheus_url.strip() == "") and run_mode != "disabled":
logging.warning("Prometheus URL not provided; disabling resiliency score features.")
run_mode = "disabled"
# KrknTelemetry init
telemetry_k8s = KrknTelemetryKubernetes(
safe_logger, kubecli, config["telemetry"]
@@ -251,9 +273,18 @@ def main(options, command: Optional[str]) -> int:
else:
elastic_search = None
summary = ChaosRunAlertSummary()
if enable_metrics or enable_alerts or check_critical_alerts:
if enable_metrics or enable_alerts or check_critical_alerts or run_mode != "disabled":
prometheus = KrknPrometheus(prometheus_url, prometheus_bearer_token)
# Quick connectivity probe for Prometheus disable resiliency if unreachable
try:
prometheus.process_prom_query_in_range(
"up", datetime.datetime.utcnow() - datetime.timedelta(seconds=60), datetime.datetime.utcnow(), granularity=60
)
except Exception as prom_exc:
logging.error("Prometheus connectivity test failed: %s. Disabling resiliency features as Prometheus is required for SLO evaluation.", prom_exc)
run_mode = "disabled"
resiliency_alerts = get_yaml_item_value(resiliency_config, "resiliency_file", get_yaml_item_value(config['performance_monitoring'],"alert_profile", "config/alerts.yaml"))
resiliency_obj = Resiliency(resiliency_alerts) if run_mode != "disabled" else None # Initialize resiliency orchestrator
logging.info("Server URL: %s" % kubecli.get_host())
if command == "list-rollback":
@@ -369,6 +400,8 @@ def main(options, command: Optional[str]) -> int:
)
sys.exit(-1)
batch_window_start_dt = datetime.datetime.utcnow()
failed_scenarios_current, scenario_telemetries = (
scenario_plugin.run_scenarios(
run_uuid, scenarios_list, config, telemetry_ocp
@@ -376,6 +409,15 @@ def main(options, command: Optional[str]) -> int:
)
failed_post_scenarios.extend(failed_scenarios_current)
chaos_telemetry.scenarios.extend(scenario_telemetries)
batch_window_end_dt = datetime.datetime.utcnow()
if resiliency_obj:
resiliency_obj.add_scenario_reports(
scenario_telemetries=scenario_telemetries,
prom_cli=prometheus,
scenario_type=scenario_type,
batch_start_dt=batch_window_start_dt,
batch_end_dt=batch_window_end_dt,
)
post_critical_alerts = 0
if check_critical_alerts:
@@ -440,12 +482,41 @@ def main(options, command: Optional[str]) -> int:
else:
logging.info("No error logs collected during chaos run")
chaos_telemetry.error_logs = []
if resiliency_obj:
try:
resiliency_obj.attach_compact_to_telemetry(chaos_telemetry)
except Exception as exc:
logging.error("Failed to embed per-scenario resiliency in telemetry: %s", exc)
if resiliency_obj:
try:
resiliency_obj.finalize_and_save(
prom_cli=prometheus,
total_start_time=datetime.datetime.fromtimestamp(start_time),
total_end_time=datetime.datetime.fromtimestamp(end_time),
run_mode=run_mode,
)
except Exception as e:
logging.error("Failed to finalize resiliency scoring: %s", e)
telemetry_json = chaos_telemetry.to_json()
decoded_chaos_run_telemetry = ChaosRunTelemetry(json.loads(telemetry_json))
if resiliency_obj and hasattr(resiliency_obj, "summary") and resiliency_obj.summary is not None:
summary_dict = resiliency_obj.get_summary()
decoded_chaos_run_telemetry.overall_resiliency_report = ResiliencyReport(
json_object=summary_dict,
resiliency_score=summary_dict.get("resiliency_score", 0),
passed_slos=summary_dict.get("passed_slos", 0),
total_slos=summary_dict.get("total_slos", 0)
)
chaos_output.telemetry = decoded_chaos_run_telemetry
logging.info(f"Chaos data:\n{chaos_output.to_json()}")
if enable_elastic:
elastic_telemetry = ElasticChaosRunTelemetry(
chaos_run_telemetry=decoded_chaos_run_telemetry
)
result = elastic_search.push_telemetry(
decoded_chaos_run_telemetry, elastic_telemetry_index
)