Compare commits

..

1 Commits

Author SHA1 Message Date
Naga Ravi Chaitanya Elluri
31756e6d9b Add Beta features governance policy (#1185)
Introduce documentation defining Beta feature expectations, lifecycle,
user guidance, and promotion criteria to GA. This helps users understand
that Beta features are experimental and intended for early feedback.

Signed-off-by: Naga Ravi Chaitanya Elluri <nelluri@redhat.com>
2026-03-12 23:39:14 -04:00
8 changed files with 148 additions and 605 deletions

1
.gitignore vendored
View File

@@ -17,7 +17,6 @@ __pycache__/*
kube-burner*
kube_burner*
recommender_*.json
resiliency*.json
# Project files
.ropeproject

141
BETA_FEATURE_POLICY.md Normal file
View File

@@ -0,0 +1,141 @@
# Beta Features Policy
## Overview
Beta features provide users early access to new capabilities before they reach full stability and general availability (GA). These features allow maintainers to gather feedback, validate usability, and improve functionality based on real-world usage.
Beta features are intended for experimentation and evaluation. While they are functional, they may not yet meet the stability, performance, or backward compatibility guarantees expected from generally available features.
---
## What is a Beta Feature
A **Beta feature** is a feature that is released for user evaluation but is still under active development and refinement.
Beta features may have the following characteristics:
- Functionally usable but still evolving
- APIs or behavior may change between releases
- Performance optimizations may still be in progress
- Documentation may be limited or evolving
- Edge cases may not be fully validated
Beta features should be considered **experimental and optional**.
---
## User Expectations
Users trying Beta features should understand the following:
- Stability is not guaranteed
- APIs and functionality may change without notice
- Backward compatibility is not guaranteed
- The feature may evolve significantly before GA
- Production use should be evaluated carefully
We strongly encourage users to provide feedback to help improve the feature before it becomes generally available.
---
## Beta Feature Identification
All Beta features are clearly identified to ensure transparency.
### In Release Notes
Beta features will be marked with a **[BETA]** tag.
Example: [BETA] Krkn Resilience Score
### In Documentation
Beta features will include a notice similar to:
> **Beta Feature**
> This feature is currently in Beta and is intended for early user feedback. Behavior, APIs, and stability may change in future releases.
---
## Feature Lifecycle
Features typically progress through the following lifecycle stages.
### 1. Development
The feature is under active development and may not yet be visible to users.
### 2. Beta
The feature is released for early adoption and feedback.
Characteristics:
- Feature is usable
- Feedback is encouraged
- Stability improvements are ongoing
### 3. Stabilization
Based on user feedback and testing, the feature is improved to meet stability and usability expectations.
### 4. General Availability (GA)
The feature is considered stable and production-ready.
GA features provide:
- Stable APIs
- Backward compatibility guarantees
- Complete documentation
- Full CI test coverage
---
## Promotion to General Availability
A Beta feature may be promoted to GA once the following criteria are met:
- Critical bugs are resolved
- Feature stability has improved through testing
- APIs and behavior are stable
- Documentation is complete
- Community feedback has been incorporated
The promotion will be announced in the release notes.
Example: Feature promoted from Beta to GA
---
## Deprecation of Beta Features
In some cases, a Beta feature may be redesigned or discontinued.
If this happens:
- The feature will be marked as **Deprecated**
- A removal timeline will be provided
- Alternative approaches will be documented when possible
Example: [DEPRECATED] This feature will be removed in a future release.
---
## Contributing Feedback
User feedback plays a critical role in improving Beta features.
Users are encouraged to report:
- Bugs
- Usability issues
- Performance concerns
- Feature suggestions
Feedback can be submitted through:
- Krkn GitHub Issues
- Krkn GitHub Discussions
- Krkn Community channels
Please include **Beta feature context** when reporting issues.
Your feedback helps guide the roadmap and ensures features are production-ready before GA.

View File

@@ -131,5 +131,4 @@ kubevirt_checks: # Utilizing virt che
disconnected: False # Boolean of how to try to connect to the VMIs; if True will use the ip_address to try ssh from within a node, if false will use the name and uses virtctl to try to connect; Default is False
ssh_node: "" # If set, will be a backup way to ssh to a node. Will want to set to a node that isn't targeted in chaos
node_names: ""
exit_on_failure: # If value is True and VMI's are failing post chaos returns failure, values can be True/False
exit_on_failure: # If value is True and VMI's are failing post chaos returns failure, values can be True/False

View File

@@ -1,79 +0,0 @@
from __future__ import annotations
import datetime
import logging
from typing import Dict, Any, List, Optional
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
# -----------------------------------------------------------------------------
# SLO evaluation helpers (used by krkn.resiliency)
# -----------------------------------------------------------------------------
def slo_passed(prometheus_result: List[Any]) -> Optional[bool]:
if not prometheus_result:
return None
has_samples = False
for series in prometheus_result:
if "values" in series:
has_samples = True
for _ts, val in series["values"]:
try:
if float(val) > 0:
return False
except (TypeError, ValueError):
continue
elif "value" in series:
has_samples = True
try:
return float(series["value"][1]) == 0
except (TypeError, ValueError):
return False
# If we reached here and never saw any samples, skip
return None if not has_samples else True
def evaluate_slos(
prom_cli: KrknPrometheus,
slo_list: List[Dict[str, Any]],
start_time: datetime.datetime,
end_time: datetime.datetime,
) -> Dict[str, bool]:
"""Evaluate a list of SLO expressions against Prometheus.
Args:
prom_cli: Configured Prometheus client.
slo_list: List of dicts with keys ``name``, ``expr``.
start_time: Start timestamp.
end_time: End timestamp.
granularity: Step in seconds for range queries.
Returns:
Mapping name -> bool indicating pass status.
True means good we passed the SLO test otherwise failed the SLO
"""
results: Dict[str, bool] = {}
logging.info("Evaluating %d SLOs over window %s %s", len(slo_list), start_time, end_time)
for slo in slo_list:
expr = slo["expr"]
name = slo["name"]
try:
response = prom_cli.process_prom_query_in_range(
expr,
start_time=start_time,
end_time=end_time,
)
passed = slo_passed(response)
if passed is None:
# Absence of data indicates the condition did not trigger; treat as pass.
logging.debug("SLO '%s' query returned no data; assuming pass.", name)
results[name] = True
else:
results[name] = passed
except Exception as exc:
logging.error("PromQL query failed for SLO '%s': %s", name, exc)
results[name] = False
return results

View File

@@ -1,4 +0,0 @@
"""krkn.resiliency package public interface."""
from .resiliency import Resiliency # noqa: F401
from .score import calculate_resiliency_score # noqa: F401

View File

@@ -1,366 +0,0 @@
"""Resiliency evaluation orchestrator for Krkn chaos runs.
This module provides the `Resiliency` class which loads the canonical
`alerts.yaml`, executes every SLO expression against Prometheus in the
chaos-test time window, determines pass/fail status and calculates an
overall resiliency score using the generic weighted model implemented
in `krkn.resiliency.score`.
"""
from __future__ import annotations
import datetime
import logging
import os
from typing import Dict, List, Any, Optional, Tuple
import yaml
import json
import dataclasses
from krkn_lib.models.telemetry import ChaosRunTelemetry
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
from krkn.prometheus.collector import evaluate_slos
from krkn.resiliency.score import calculate_resiliency_score
class Resiliency:
"""Central orchestrator for resiliency scoring."""
def __init__(self, alerts_yaml_path: str):
if not os.path.exists(alerts_yaml_path):
raise FileNotFoundError(f"alerts file not found: {alerts_yaml_path}")
with open(alerts_yaml_path, "r", encoding="utf-8") as fp:
raw_yaml_data = yaml.safe_load(fp)
logging.info("Loaded SLO configuration from %s", alerts_yaml_path)
self._slos = self._normalise_alerts(raw_yaml_data)
self._results: Dict[str, bool] = {}
self._score: Optional[int] = None
self._breakdown: Optional[Dict[str, int]] = None
self._health_check_results: Dict[str, bool] = {}
self.scenario_reports: List[Dict[str, Any]] = []
self.summary: Optional[Dict[str, Any]] = None
self.detailed_report: Optional[Dict[str, Any]] = None
# ---------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------
def calculate_score(
self,
*,
health_check_results: Optional[Dict[str, bool]] = None,
) -> int:
"""Calculate the resiliency score using collected SLO results."""
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
score, breakdown = calculate_resiliency_score(
slo_definitions=slo_defs,
prometheus_results=self._results,
health_check_results=health_check_results or {},
)
self._score = score
self._breakdown = breakdown
self._health_check_results = health_check_results or {}
return score
def to_dict(self) -> Dict[str, Any]:
"""Return a dictionary ready for telemetry output."""
if self._score is None:
raise RuntimeError("calculate_score() must be called before to_dict()")
return {
"score": self._score,
"breakdown": self._breakdown,
"slo_results": self._results,
"health_check_results": getattr(self, "_health_check_results", {}),
}
# ------------------------------------------------------------------
# Scenario-based resiliency evaluation
# ------------------------------------------------------------------
def add_scenario_report(
self,
*,
scenario_name: str,
prom_cli: KrknPrometheus,
start_time: datetime.datetime,
end_time: datetime.datetime,
weight: float | int = 1,
health_check_results: Optional[Dict[str, bool]] = None,
) -> int:
"""
Evaluate SLOs for a single scenario window and store the result.
Args:
scenario_name: Human-friendly scenario identifier.
prom_cli: Initialized KrknPrometheus instance.
start_time: Window start.
end_time: Window end.
weight: Weight to use for the final weighted average calculation.
health_check_results: Optional mapping of custom health-check name ➡ bool.
Returns:
The calculated integer resiliency score (0-100) for this scenario.
"""
slo_results = evaluate_slos(
prom_cli=prom_cli,
slo_list=self._slos,
start_time=start_time,
end_time=end_time,
)
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
score, breakdown = calculate_resiliency_score(
slo_definitions=slo_defs,
prometheus_results=slo_results,
health_check_results=health_check_results or {},
)
self.scenario_reports.append(
{
"name": scenario_name,
"window": {
"start": start_time.isoformat(),
"end": end_time.isoformat(),
},
"score": score,
"weight": weight,
"breakdown": breakdown,
"slo_results": slo_results,
"health_check_results": health_check_results or {},
}
)
return score
def finalize_report(
self,
*,
prom_cli: KrknPrometheus,
total_start_time: datetime.datetime,
total_end_time: datetime.datetime,
) -> None:
if not self.scenario_reports:
raise RuntimeError("No scenario reports added nothing to finalize")
# ---------------- Weighted average (primary resiliency_score) ----------
total_weight = sum(rep["weight"] for rep in self.scenario_reports)
resiliency_score = int(
sum(rep["score"] * rep["weight"] for rep in self.scenario_reports) / total_weight
)
# ---------------- Overall SLO evaluation across full test window -----------------------------
full_slo_results = evaluate_slos(
prom_cli=prom_cli,
slo_list=self._slos,
start_time=total_start_time,
end_time=total_end_time,
)
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
_overall_score, full_breakdown = calculate_resiliency_score(
slo_definitions=slo_defs,
prometheus_results=full_slo_results,
health_check_results={},
)
self.summary = {
"scenarios": {rep["name"]: rep["score"] for rep in self.scenario_reports},
"resiliency_score": resiliency_score,
"passed_slos": full_breakdown.get("passed", 0),
"total_slos": full_breakdown.get("passed", 0) + full_breakdown.get("failed", 0),
}
# Detailed report currently limited to per-scenario information; system stability section removed
self.detailed_report = {
"scenarios": self.scenario_reports,
}
def get_summary(self) -> Dict[str, Any]:
"""Return the concise resiliency_summary structure."""
if not hasattr(self, "summary") or self.summary is None:
raise RuntimeError("finalize_report() must be called first")
return self.summary
def get_detailed_report(self) -> Dict[str, Any]:
"""Return the full resiliency-report structure."""
if not hasattr(self, "detailed_report") or self.detailed_report is None:
raise RuntimeError("finalize_report() must be called first")
return self.detailed_report
@staticmethod
def compact_breakdown(report: Dict[str, Any]) -> Dict[str, int]:
"""Return a compact summary dict for a single scenario report."""
try:
passed = report["breakdown"]["passed"]
failed = report["breakdown"]["failed"]
score_val = report["score"]
except Exception:
passed = report.get("breakdown", {}).get("passed", 0)
failed = report.get("breakdown", {}).get("failed", 0)
score_val = report.get("score", 0)
return {
"resiliency_score": score_val,
"passed_slos": passed,
"total_slos": passed + failed,
}
def attach_compact_to_telemetry(self, chaos_telemetry: ChaosRunTelemetry) -> None:
"""Embed per-scenario compact resiliency reports into a ChaosRunTelemetry instance."""
score_map = {
rep["name"]: self.compact_breakdown(rep) for rep in self.scenario_reports
}
new_scenarios = []
for item in getattr(chaos_telemetry, "scenarios", []):
if isinstance(item, dict):
name = item.get("scenario")
if name in score_map:
item["resiliency_report"] = score_map[name]
new_scenarios.append(item)
else:
name = getattr(item, "scenario", None)
try:
item_dict = dataclasses.asdict(item)
except Exception:
item_dict = {
k: getattr(item, k)
for k in dir(item)
if not k.startswith("__") and not callable(getattr(item, k))
}
if name in score_map:
item_dict["resiliency_report"] = score_map[name]
new_scenarios.append(item_dict)
chaos_telemetry.scenarios = new_scenarios
def add_scenario_reports(
self,
*,
scenario_telemetries,
prom_cli: KrknPrometheus,
scenario_type: str,
batch_start_dt: datetime.datetime,
batch_end_dt: datetime.datetime,
weight: int | float = 1,
) -> None:
"""Evaluate SLOs for every telemetry item belonging to a scenario window,
store the result and enrich the telemetry list with a compact resiliency breakdown.
Args:
scenario_telemetries: Iterable with telemetry objects/dicts for the
current scenario batch window.
prom_cli: Pre-configured :class:`KrknPrometheus` instance.
scenario_type: Fallback scenario identifier in case individual
telemetry items do not provide one.
batch_start_dt: Fallback start timestamp for the batch window.
batch_end_dt: Fallback end timestamp for the batch window.
weight: Weight to assign to every scenario when calculating the final
weighted average.
logger: Optional custom logger.
"""
for tel in scenario_telemetries:
try:
# -------- Extract timestamps & scenario name --------------------
if isinstance(tel, dict):
st_ts = tel.get("start_timestamp")
en_ts = tel.get("end_timestamp")
scen_name = tel.get("scenario", scenario_type)
else:
st_ts = getattr(tel, "start_timestamp", None)
en_ts = getattr(tel, "end_timestamp", None)
scen_name = getattr(tel, "scenario", scenario_type)
if st_ts and en_ts:
st_dt = datetime.datetime.fromtimestamp(int(st_ts))
en_dt = datetime.datetime.fromtimestamp(int(en_ts))
else:
st_dt = batch_start_dt
en_dt = batch_end_dt
# -------- Calculate resiliency score for the scenario -----------
self.add_scenario_report(
scenario_name=str(scen_name),
prom_cli=prom_cli,
start_time=st_dt,
end_time=en_dt,
weight=weight,
health_check_results=None,
)
compact = self.compact_breakdown(self.scenario_reports[-1])
if isinstance(tel, dict):
tel["resiliency_report"] = compact
else:
setattr(tel, "resiliency_report", compact)
except Exception as exc:
logging.error("Resiliency per-scenario evaluation failed: %s", exc)
def finalize_and_save(
self,
*,
prom_cli: KrknPrometheus,
total_start_time: datetime.datetime,
total_end_time: datetime.datetime,
run_mode: str = "standalone",
detailed_path: str = "resiliency-report.json",
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Finalize resiliency scoring, persist reports and return them.
Args:
prom_cli: Pre-configured KrknPrometheus instance.
total_start_time: Start time for the full test window.
total_end_time: End time for the full test window.
run_mode: "controller" or "standalone" mode.
Returns:
(detailed_report)
"""
try:
self.finalize_report(
prom_cli=prom_cli,
total_start_time=total_start_time,
total_end_time=total_end_time,
)
detailed = self.get_detailed_report()
if run_mode == "controller":
# krknctl expects the detailed report on stdout in a special format
try:
detailed_json = json.dumps(detailed)
print(f"KRKN_RESILIENCY_REPORT_JSON:{detailed_json}")
logging.info("Resiliency report logged to stdout for krknctl.")
except Exception as exc:
logging.error("Failed to serialize and log detailed resiliency report: %s", exc)
else:
# Stand-alone mode write to files for post-run consumption
try:
with open(detailed_path, "w", encoding="utf-8") as fp:
json.dump(detailed, fp, indent=2)
logging.info("Resiliency report written: %s", detailed_path)
except Exception as io_exc:
logging.error("Failed to write resiliency report files: %s", io_exc)
except Exception as exc:
logging.error("Failed to finalize resiliency scoring: %s", exc)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _normalise_alerts(raw_alerts: Any) -> List[Dict[str, Any]]:
"""Convert raw YAML alerts data into internal SLO list structure."""
if not isinstance(raw_alerts, list):
raise ValueError("SLO configuration must be a list under key 'slos' or top-level list")
slos: List[Dict[str, Any]] = []
for idx, alert in enumerate(raw_alerts):
if not (isinstance(alert, dict) and "expr" in alert and "severity" in alert):
logging.warning("Skipping invalid alert entry at index %d: %s", idx, alert)
continue
name = alert.get("description") or f"slo_{idx}"
slos.append(
{
"name": name,
"expr": alert["expr"],
"severity": str(alert["severity"]).lower(),
"weight": alert.get("weight")
}
)
return slos

View File

@@ -1,76 +0,0 @@
from __future__ import annotations
from typing import Dict, List, Tuple
DEFAULT_WEIGHTS = {"critical": 3, "warning": 1}
class SLOResult:
"""Simple container representing evaluation outcome for a single SLO."""
def __init__(self, name: str, severity: str, passed: bool, weight: int | None = None):
self.name = name
self.severity = severity
self.passed = passed
self._custom_weight = weight
def weight(self, severity_weights: Dict[str, int]) -> int:
"""Return the weight for this SLO. Uses custom weight if set, otherwise uses severity-based weight."""
if self._custom_weight is not None:
return self._custom_weight
return severity_weights.get(self.severity, severity_weights.get("warning", 1))
def calculate_resiliency_score(
slo_definitions: Dict[str, str] | Dict[str, Dict[str, int | str | None]],
prometheus_results: Dict[str, bool],
health_check_results: Dict[str, bool],
) -> Tuple[int, Dict[str, int]]:
"""Compute a resiliency score between 0-100 based on SLO pass/fail results.
Args:
slo_definitions: Mapping of SLO name -> severity ("critical" | "warning") OR
SLO name -> {"severity": str, "weight": int | None}.
prometheus_results: Mapping of SLO name -> bool indicating whether the SLO
passed. Any SLO missing in this mapping is treated as failed.
health_check_results: Mapping of custom health-check name -> bool pass flag.
These checks are always treated as *critical*.
Returns:
Tuple containing (final_score, breakdown) where *breakdown* is a dict with
the counts of passed/failed SLOs per severity.
"""
slo_objects: List[SLOResult] = []
for slo_name, slo_def in slo_definitions.items():
# Exclude SLOs that were not evaluated (query returned no data)
if slo_name not in prometheus_results:
continue
passed = bool(prometheus_results[slo_name])
# Support both old format (str) and new format (dict)
if isinstance(slo_def, str):
severity = slo_def
slo_weight = None
else:
severity = slo_def.get("severity", "warning")
slo_weight = slo_def.get("weight")
slo_objects.append(SLOResult(slo_name, severity, passed, weight=slo_weight))
# Health-check SLOs (by default keeping them critical)
for hc_name, hc_passed in health_check_results.items():
slo_objects.append(SLOResult(hc_name, "critical", bool(hc_passed)))
total_points = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects)
points_lost = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects if not slo.passed)
score = 0 if total_points == 0 else int(((total_points - points_lost) / total_points) * 100)
breakdown = {
"total_points": total_points,
"points_lost": points_lost,
"passed": len([s for s in slo_objects if s.passed]),
"failed": len([s for s in slo_objects if not s.passed]),
}
return score, breakdown

View File

@@ -12,7 +12,7 @@ import uuid
import time
import queue
import threading
from typing import Optional, Dict
from typing import Optional
from krkn import cerberus
from krkn_lib.elastic.krkn_elastic import KrknElastic
@@ -21,15 +21,11 @@ from krkn_lib.models.krkn import ChaosRunOutput, ChaosRunAlertSummary
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
import krkn.prometheus as prometheus_plugin
import server as server
from krkn.resiliency.resiliency import (
Resiliency
)
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.ocp import KrknOpenshift
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.models.telemetry import ChaosRunTelemetry
from krkn_lib.models.k8s import ResiliencyReport
from krkn_lib.utils import SafeLogger
from krkn_lib.utils.functions import get_yaml_item_value, get_junit_test_case
@@ -58,8 +54,6 @@ def main(options, command: Optional[str]) -> int:
print(pyfiglet.figlet_format("kraken"))
logging.info("Starting kraken")
cfg = options.cfg
# Parse and read the config
if os.path.isfile(cfg):
@@ -71,7 +65,6 @@ def main(options, command: Optional[str]) -> int:
get_yaml_item_value(config["kraken"], "kubeconfig_path", "")
)
kraken_config = cfg
chaos_scenarios = get_yaml_item_value(config["kraken"], "chaos_scenarios", [])
publish_running_status = get_yaml_item_value(
config["kraken"], "publish_kraken_status", False
@@ -93,20 +86,14 @@ def main(options, command: Optional[str]) -> int:
config["kraken"], "signal_address", "0.0.0.0"
)
run_signal = get_yaml_item_value(config["kraken"], "signal_state", "RUN")
resiliency_config = get_yaml_item_value(config,"resiliency",{})
# Determine execution mode (standalone, controller, or disabled)
run_mode = get_yaml_item_value(resiliency_config, "resiliency_run_mode", "standalone")
valid_run_modes = {"standalone", "detailed", "disabled"}
if run_mode not in valid_run_modes:
logging.warning("Unknown resiliency_run_mode '%s'. Defaulting to 'standalone'", run_mode)
run_mode = "standalone"
wait_duration = get_yaml_item_value(config["tunings"], "wait_duration", 60)
iterations = get_yaml_item_value(config["tunings"], "iterations", 1)
daemon_mode = get_yaml_item_value(config["tunings"], "daemon_mode", False)
prometheus_url = config["performance_monitoring"].get("prometheus_url")
prometheus_bearer_token = config["performance_monitoring"].get("prometheus_bearer_token")
prometheus_bearer_token = config["performance_monitoring"].get(
"prometheus_bearer_token"
)
run_uuid = config["performance_monitoring"].get("uuid")
enable_alerts = get_yaml_item_value(
config["performance_monitoring"], "enable_alerts", False
@@ -114,10 +101,6 @@ def main(options, command: Optional[str]) -> int:
enable_metrics = get_yaml_item_value(
config["performance_monitoring"], "enable_metrics", False
)
# Default placeholder; will be overridden if a Prometheus URL is available
prometheus = None
# elastic search
enable_elastic = get_yaml_item_value(config["elastic"], "enable_elastic", False)
elastic_run_tag = get_yaml_item_value(config["elastic"], "run_tag", "")
@@ -248,11 +231,6 @@ def main(options, command: Optional[str]) -> int:
else:
logging.info("Cluster version CRD not detected, skipping")
# Final check: ensure Prometheus URL is available; disable resiliency if not
if (not prometheus_url or prometheus_url.strip() == "") and run_mode != "disabled":
logging.warning("Prometheus URL not provided; disabling resiliency score features.")
run_mode = "disabled"
# KrknTelemetry init
telemetry_k8s = KrknTelemetryKubernetes(
safe_logger, kubecli, config["telemetry"]
@@ -273,18 +251,9 @@ def main(options, command: Optional[str]) -> int:
else:
elastic_search = None
summary = ChaosRunAlertSummary()
if enable_metrics or enable_alerts or check_critical_alerts or run_mode != "disabled":
if enable_metrics or enable_alerts or check_critical_alerts:
prometheus = KrknPrometheus(prometheus_url, prometheus_bearer_token)
# Quick connectivity probe for Prometheus disable resiliency if unreachable
try:
prometheus.process_prom_query_in_range(
"up", datetime.datetime.utcnow() - datetime.timedelta(seconds=60), datetime.datetime.utcnow(), granularity=60
)
except Exception as prom_exc:
logging.error("Prometheus connectivity test failed: %s. Disabling resiliency features as Prometheus is required for SLO evaluation.", prom_exc)
run_mode = "disabled"
resiliency_alerts = get_yaml_item_value(resiliency_config, "resiliency_file", get_yaml_item_value(config['performance_monitoring'],"alert_profile", "config/alerts.yaml"))
resiliency_obj = Resiliency(resiliency_alerts) if run_mode != "disabled" else None # Initialize resiliency orchestrator
logging.info("Server URL: %s" % kubecli.get_host())
if command == "list-rollback":
@@ -400,8 +369,6 @@ def main(options, command: Optional[str]) -> int:
)
sys.exit(-1)
batch_window_start_dt = datetime.datetime.utcnow()
failed_scenarios_current, scenario_telemetries = (
scenario_plugin.run_scenarios(
run_uuid, scenarios_list, config, telemetry_ocp
@@ -409,15 +376,6 @@ def main(options, command: Optional[str]) -> int:
)
failed_post_scenarios.extend(failed_scenarios_current)
chaos_telemetry.scenarios.extend(scenario_telemetries)
batch_window_end_dt = datetime.datetime.utcnow()
if resiliency_obj:
resiliency_obj.add_scenario_reports(
scenario_telemetries=scenario_telemetries,
prom_cli=prometheus,
scenario_type=scenario_type,
batch_start_dt=batch_window_start_dt,
batch_end_dt=batch_window_end_dt,
)
post_critical_alerts = 0
if check_critical_alerts:
@@ -482,41 +440,12 @@ def main(options, command: Optional[str]) -> int:
else:
logging.info("No error logs collected during chaos run")
chaos_telemetry.error_logs = []
if resiliency_obj:
try:
resiliency_obj.attach_compact_to_telemetry(chaos_telemetry)
except Exception as exc:
logging.error("Failed to embed per-scenario resiliency in telemetry: %s", exc)
if resiliency_obj:
try:
resiliency_obj.finalize_and_save(
prom_cli=prometheus,
total_start_time=datetime.datetime.fromtimestamp(start_time),
total_end_time=datetime.datetime.fromtimestamp(end_time),
run_mode=run_mode,
)
except Exception as e:
logging.error("Failed to finalize resiliency scoring: %s", e)
telemetry_json = chaos_telemetry.to_json()
decoded_chaos_run_telemetry = ChaosRunTelemetry(json.loads(telemetry_json))
if resiliency_obj and hasattr(resiliency_obj, "summary") and resiliency_obj.summary is not None:
summary_dict = resiliency_obj.get_summary()
decoded_chaos_run_telemetry.overall_resiliency_report = ResiliencyReport(
json_object=summary_dict,
resiliency_score=summary_dict.get("resiliency_score", 0),
passed_slos=summary_dict.get("passed_slos", 0),
total_slos=summary_dict.get("total_slos", 0)
)
chaos_output.telemetry = decoded_chaos_run_telemetry
logging.info(f"Chaos data:\n{chaos_output.to_json()}")
if enable_elastic:
elastic_telemetry = ElasticChaosRunTelemetry(
chaos_run_telemetry=decoded_chaos_run_telemetry
)
result = elastic_search.push_telemetry(
decoded_chaos_run_telemetry, elastic_telemetry_index
)