mirror of
https://github.com/krkn-chaos/krkn.git
synced 2026-03-16 08:30:37 +00:00
Compare commits
1 Commits
v5.0.1-bet
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
31756e6d9b |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -17,7 +17,6 @@ __pycache__/*
|
||||
kube-burner*
|
||||
kube_burner*
|
||||
recommender_*.json
|
||||
resiliency*.json
|
||||
|
||||
# Project files
|
||||
.ropeproject
|
||||
|
||||
141
BETA_FEATURE_POLICY.md
Normal file
141
BETA_FEATURE_POLICY.md
Normal file
@@ -0,0 +1,141 @@
|
||||
# Beta Features Policy
|
||||
|
||||
## Overview
|
||||
|
||||
Beta features provide users early access to new capabilities before they reach full stability and general availability (GA). These features allow maintainers to gather feedback, validate usability, and improve functionality based on real-world usage.
|
||||
|
||||
Beta features are intended for experimentation and evaluation. While they are functional, they may not yet meet the stability, performance, or backward compatibility guarantees expected from generally available features.
|
||||
|
||||
---
|
||||
|
||||
## What is a Beta Feature
|
||||
|
||||
A **Beta feature** is a feature that is released for user evaluation but is still under active development and refinement.
|
||||
|
||||
Beta features may have the following characteristics:
|
||||
|
||||
- Functionally usable but still evolving
|
||||
- APIs or behavior may change between releases
|
||||
- Performance optimizations may still be in progress
|
||||
- Documentation may be limited or evolving
|
||||
- Edge cases may not be fully validated
|
||||
|
||||
Beta features should be considered **experimental and optional**.
|
||||
|
||||
---
|
||||
|
||||
## User Expectations
|
||||
|
||||
Users trying Beta features should understand the following:
|
||||
|
||||
- Stability is not guaranteed
|
||||
- APIs and functionality may change without notice
|
||||
- Backward compatibility is not guaranteed
|
||||
- The feature may evolve significantly before GA
|
||||
- Production use should be evaluated carefully
|
||||
|
||||
We strongly encourage users to provide feedback to help improve the feature before it becomes generally available.
|
||||
|
||||
---
|
||||
|
||||
## Beta Feature Identification
|
||||
|
||||
All Beta features are clearly identified to ensure transparency.
|
||||
|
||||
### In Release Notes
|
||||
|
||||
Beta features will be marked with a **[BETA]** tag.
|
||||
|
||||
Example: [BETA] Krkn Resilience Score
|
||||
|
||||
|
||||
### In Documentation
|
||||
|
||||
Beta features will include a notice similar to:
|
||||
|
||||
> **Beta Feature**
|
||||
> This feature is currently in Beta and is intended for early user feedback. Behavior, APIs, and stability may change in future releases.
|
||||
|
||||
---
|
||||
|
||||
## Feature Lifecycle
|
||||
|
||||
Features typically progress through the following lifecycle stages.
|
||||
|
||||
### 1. Development
|
||||
The feature is under active development and may not yet be visible to users.
|
||||
|
||||
### 2. Beta
|
||||
The feature is released for early adoption and feedback.
|
||||
|
||||
Characteristics:
|
||||
|
||||
- Feature is usable
|
||||
- Feedback is encouraged
|
||||
- Stability improvements are ongoing
|
||||
|
||||
### 3. Stabilization
|
||||
Based on user feedback and testing, the feature is improved to meet stability and usability expectations.
|
||||
|
||||
### 4. General Availability (GA)
|
||||
|
||||
The feature is considered stable and production-ready.
|
||||
|
||||
GA features provide:
|
||||
|
||||
- Stable APIs
|
||||
- Backward compatibility guarantees
|
||||
- Complete documentation
|
||||
- Full CI test coverage
|
||||
|
||||
---
|
||||
|
||||
## Promotion to General Availability
|
||||
|
||||
A Beta feature may be promoted to GA once the following criteria are met:
|
||||
|
||||
- Critical bugs are resolved
|
||||
- Feature stability has improved through testing
|
||||
- APIs and behavior are stable
|
||||
- Documentation is complete
|
||||
- Community feedback has been incorporated
|
||||
|
||||
The promotion will be announced in the release notes.
|
||||
|
||||
Example: Feature promoted from Beta to GA
|
||||
|
||||
|
||||
---
|
||||
|
||||
## Deprecation of Beta Features
|
||||
|
||||
In some cases, a Beta feature may be redesigned or discontinued.
|
||||
|
||||
If this happens:
|
||||
|
||||
- The feature will be marked as **Deprecated**
|
||||
- A removal timeline will be provided
|
||||
- Alternative approaches will be documented when possible
|
||||
|
||||
Example: [DEPRECATED] This feature will be removed in a future release.
|
||||
|
||||
---
|
||||
|
||||
## Contributing Feedback
|
||||
User feedback plays a critical role in improving Beta features.
|
||||
|
||||
Users are encouraged to report:
|
||||
|
||||
- Bugs
|
||||
- Usability issues
|
||||
- Performance concerns
|
||||
- Feature suggestions
|
||||
|
||||
Feedback can be submitted through:
|
||||
|
||||
- Krkn GitHub Issues
|
||||
- Krkn GitHub Discussions
|
||||
- Krkn Community channels
|
||||
|
||||
Please include **Beta feature context** when reporting issues.
|
||||
Your feedback helps guide the roadmap and ensures features are production-ready before GA.
|
||||
@@ -131,5 +131,4 @@ kubevirt_checks: # Utilizing virt che
|
||||
disconnected: False # Boolean of how to try to connect to the VMIs; if True will use the ip_address to try ssh from within a node, if false will use the name and uses virtctl to try to connect; Default is False
|
||||
ssh_node: "" # If set, will be a backup way to ssh to a node. Will want to set to a node that isn't targeted in chaos
|
||||
node_names: ""
|
||||
exit_on_failure: # If value is True and VMI's are failing post chaos returns failure, values can be True/False
|
||||
|
||||
exit_on_failure: # If value is True and VMI's are failing post chaos returns failure, values can be True/False
|
||||
@@ -1,79 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
from typing import Dict, Any, List, Optional
|
||||
|
||||
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# SLO evaluation helpers (used by krkn.resiliency)
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
def slo_passed(prometheus_result: List[Any]) -> Optional[bool]:
|
||||
if not prometheus_result:
|
||||
return None
|
||||
has_samples = False
|
||||
for series in prometheus_result:
|
||||
if "values" in series:
|
||||
has_samples = True
|
||||
for _ts, val in series["values"]:
|
||||
try:
|
||||
if float(val) > 0:
|
||||
return False
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
elif "value" in series:
|
||||
has_samples = True
|
||||
try:
|
||||
return float(series["value"][1]) == 0
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
|
||||
# If we reached here and never saw any samples, skip
|
||||
return None if not has_samples else True
|
||||
|
||||
|
||||
def evaluate_slos(
|
||||
prom_cli: KrknPrometheus,
|
||||
slo_list: List[Dict[str, Any]],
|
||||
start_time: datetime.datetime,
|
||||
end_time: datetime.datetime,
|
||||
) -> Dict[str, bool]:
|
||||
"""Evaluate a list of SLO expressions against Prometheus.
|
||||
|
||||
Args:
|
||||
prom_cli: Configured Prometheus client.
|
||||
slo_list: List of dicts with keys ``name``, ``expr``.
|
||||
start_time: Start timestamp.
|
||||
end_time: End timestamp.
|
||||
granularity: Step in seconds for range queries.
|
||||
Returns:
|
||||
Mapping name -> bool indicating pass status.
|
||||
True means good we passed the SLO test otherwise failed the SLO
|
||||
"""
|
||||
results: Dict[str, bool] = {}
|
||||
logging.info("Evaluating %d SLOs over window %s – %s", len(slo_list), start_time, end_time)
|
||||
for slo in slo_list:
|
||||
expr = slo["expr"]
|
||||
name = slo["name"]
|
||||
try:
|
||||
response = prom_cli.process_prom_query_in_range(
|
||||
expr,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
)
|
||||
|
||||
passed = slo_passed(response)
|
||||
if passed is None:
|
||||
# Absence of data indicates the condition did not trigger; treat as pass.
|
||||
logging.debug("SLO '%s' query returned no data; assuming pass.", name)
|
||||
results[name] = True
|
||||
else:
|
||||
results[name] = passed
|
||||
except Exception as exc:
|
||||
logging.error("PromQL query failed for SLO '%s': %s", name, exc)
|
||||
results[name] = False
|
||||
return results
|
||||
@@ -1,4 +0,0 @@
|
||||
"""krkn.resiliency package public interface."""
|
||||
|
||||
from .resiliency import Resiliency # noqa: F401
|
||||
from .score import calculate_resiliency_score # noqa: F401
|
||||
@@ -1,366 +0,0 @@
|
||||
"""Resiliency evaluation orchestrator for Krkn chaos runs.
|
||||
|
||||
This module provides the `Resiliency` class which loads the canonical
|
||||
`alerts.yaml`, executes every SLO expression against Prometheus in the
|
||||
chaos-test time window, determines pass/fail status and calculates an
|
||||
overall resiliency score using the generic weighted model implemented
|
||||
in `krkn.resiliency.score`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
from typing import Dict, List, Any, Optional, Tuple
|
||||
|
||||
import yaml
|
||||
import json
|
||||
import dataclasses
|
||||
from krkn_lib.models.telemetry import ChaosRunTelemetry
|
||||
|
||||
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
|
||||
from krkn.prometheus.collector import evaluate_slos
|
||||
from krkn.resiliency.score import calculate_resiliency_score
|
||||
|
||||
|
||||
class Resiliency:
|
||||
"""Central orchestrator for resiliency scoring."""
|
||||
|
||||
def __init__(self, alerts_yaml_path: str):
|
||||
|
||||
if not os.path.exists(alerts_yaml_path):
|
||||
raise FileNotFoundError(f"alerts file not found: {alerts_yaml_path}")
|
||||
with open(alerts_yaml_path, "r", encoding="utf-8") as fp:
|
||||
raw_yaml_data = yaml.safe_load(fp)
|
||||
logging.info("Loaded SLO configuration from %s", alerts_yaml_path)
|
||||
|
||||
self._slos = self._normalise_alerts(raw_yaml_data)
|
||||
self._results: Dict[str, bool] = {}
|
||||
self._score: Optional[int] = None
|
||||
self._breakdown: Optional[Dict[str, int]] = None
|
||||
self._health_check_results: Dict[str, bool] = {}
|
||||
self.scenario_reports: List[Dict[str, Any]] = []
|
||||
self.summary: Optional[Dict[str, Any]] = None
|
||||
self.detailed_report: Optional[Dict[str, Any]] = None
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------
|
||||
|
||||
def calculate_score(
|
||||
self,
|
||||
*,
|
||||
health_check_results: Optional[Dict[str, bool]] = None,
|
||||
) -> int:
|
||||
"""Calculate the resiliency score using collected SLO results."""
|
||||
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions=slo_defs,
|
||||
prometheus_results=self._results,
|
||||
health_check_results=health_check_results or {},
|
||||
)
|
||||
self._score = score
|
||||
self._breakdown = breakdown
|
||||
self._health_check_results = health_check_results or {}
|
||||
return score
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Return a dictionary ready for telemetry output."""
|
||||
if self._score is None:
|
||||
raise RuntimeError("calculate_score() must be called before to_dict()")
|
||||
return {
|
||||
"score": self._score,
|
||||
"breakdown": self._breakdown,
|
||||
"slo_results": self._results,
|
||||
"health_check_results": getattr(self, "_health_check_results", {}),
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Scenario-based resiliency evaluation
|
||||
# ------------------------------------------------------------------
|
||||
def add_scenario_report(
|
||||
self,
|
||||
*,
|
||||
scenario_name: str,
|
||||
prom_cli: KrknPrometheus,
|
||||
start_time: datetime.datetime,
|
||||
end_time: datetime.datetime,
|
||||
weight: float | int = 1,
|
||||
health_check_results: Optional[Dict[str, bool]] = None,
|
||||
) -> int:
|
||||
"""
|
||||
Evaluate SLOs for a single scenario window and store the result.
|
||||
|
||||
Args:
|
||||
scenario_name: Human-friendly scenario identifier.
|
||||
prom_cli: Initialized KrknPrometheus instance.
|
||||
start_time: Window start.
|
||||
end_time: Window end.
|
||||
weight: Weight to use for the final weighted average calculation.
|
||||
health_check_results: Optional mapping of custom health-check name ➡ bool.
|
||||
Returns:
|
||||
The calculated integer resiliency score (0-100) for this scenario.
|
||||
"""
|
||||
slo_results = evaluate_slos(
|
||||
prom_cli=prom_cli,
|
||||
slo_list=self._slos,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
)
|
||||
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions=slo_defs,
|
||||
prometheus_results=slo_results,
|
||||
health_check_results=health_check_results or {},
|
||||
)
|
||||
self.scenario_reports.append(
|
||||
{
|
||||
"name": scenario_name,
|
||||
"window": {
|
||||
"start": start_time.isoformat(),
|
||||
"end": end_time.isoformat(),
|
||||
},
|
||||
"score": score,
|
||||
"weight": weight,
|
||||
"breakdown": breakdown,
|
||||
"slo_results": slo_results,
|
||||
"health_check_results": health_check_results or {},
|
||||
}
|
||||
)
|
||||
return score
|
||||
|
||||
def finalize_report(
|
||||
self,
|
||||
*,
|
||||
prom_cli: KrknPrometheus,
|
||||
total_start_time: datetime.datetime,
|
||||
total_end_time: datetime.datetime,
|
||||
) -> None:
|
||||
if not self.scenario_reports:
|
||||
raise RuntimeError("No scenario reports added – nothing to finalize")
|
||||
|
||||
# ---------------- Weighted average (primary resiliency_score) ----------
|
||||
total_weight = sum(rep["weight"] for rep in self.scenario_reports)
|
||||
resiliency_score = int(
|
||||
sum(rep["score"] * rep["weight"] for rep in self.scenario_reports) / total_weight
|
||||
)
|
||||
|
||||
# ---------------- Overall SLO evaluation across full test window -----------------------------
|
||||
full_slo_results = evaluate_slos(
|
||||
prom_cli=prom_cli,
|
||||
slo_list=self._slos,
|
||||
start_time=total_start_time,
|
||||
end_time=total_end_time,
|
||||
)
|
||||
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
|
||||
_overall_score, full_breakdown = calculate_resiliency_score(
|
||||
slo_definitions=slo_defs,
|
||||
prometheus_results=full_slo_results,
|
||||
health_check_results={},
|
||||
)
|
||||
|
||||
self.summary = {
|
||||
"scenarios": {rep["name"]: rep["score"] for rep in self.scenario_reports},
|
||||
"resiliency_score": resiliency_score,
|
||||
"passed_slos": full_breakdown.get("passed", 0),
|
||||
"total_slos": full_breakdown.get("passed", 0) + full_breakdown.get("failed", 0),
|
||||
}
|
||||
|
||||
# Detailed report currently limited to per-scenario information; system stability section removed
|
||||
self.detailed_report = {
|
||||
"scenarios": self.scenario_reports,
|
||||
}
|
||||
|
||||
def get_summary(self) -> Dict[str, Any]:
|
||||
"""Return the concise resiliency_summary structure."""
|
||||
if not hasattr(self, "summary") or self.summary is None:
|
||||
raise RuntimeError("finalize_report() must be called first")
|
||||
return self.summary
|
||||
|
||||
def get_detailed_report(self) -> Dict[str, Any]:
|
||||
"""Return the full resiliency-report structure."""
|
||||
if not hasattr(self, "detailed_report") or self.detailed_report is None:
|
||||
raise RuntimeError("finalize_report() must be called first")
|
||||
return self.detailed_report
|
||||
|
||||
@staticmethod
|
||||
def compact_breakdown(report: Dict[str, Any]) -> Dict[str, int]:
|
||||
"""Return a compact summary dict for a single scenario report."""
|
||||
try:
|
||||
passed = report["breakdown"]["passed"]
|
||||
failed = report["breakdown"]["failed"]
|
||||
score_val = report["score"]
|
||||
except Exception:
|
||||
passed = report.get("breakdown", {}).get("passed", 0)
|
||||
failed = report.get("breakdown", {}).get("failed", 0)
|
||||
score_val = report.get("score", 0)
|
||||
return {
|
||||
"resiliency_score": score_val,
|
||||
"passed_slos": passed,
|
||||
"total_slos": passed + failed,
|
||||
}
|
||||
|
||||
def attach_compact_to_telemetry(self, chaos_telemetry: ChaosRunTelemetry) -> None:
|
||||
"""Embed per-scenario compact resiliency reports into a ChaosRunTelemetry instance."""
|
||||
score_map = {
|
||||
rep["name"]: self.compact_breakdown(rep) for rep in self.scenario_reports
|
||||
}
|
||||
new_scenarios = []
|
||||
for item in getattr(chaos_telemetry, "scenarios", []):
|
||||
if isinstance(item, dict):
|
||||
name = item.get("scenario")
|
||||
if name in score_map:
|
||||
item["resiliency_report"] = score_map[name]
|
||||
new_scenarios.append(item)
|
||||
else:
|
||||
name = getattr(item, "scenario", None)
|
||||
try:
|
||||
item_dict = dataclasses.asdict(item)
|
||||
except Exception:
|
||||
item_dict = {
|
||||
k: getattr(item, k)
|
||||
for k in dir(item)
|
||||
if not k.startswith("__") and not callable(getattr(item, k))
|
||||
}
|
||||
if name in score_map:
|
||||
item_dict["resiliency_report"] = score_map[name]
|
||||
new_scenarios.append(item_dict)
|
||||
chaos_telemetry.scenarios = new_scenarios
|
||||
|
||||
def add_scenario_reports(
|
||||
self,
|
||||
*,
|
||||
scenario_telemetries,
|
||||
prom_cli: KrknPrometheus,
|
||||
scenario_type: str,
|
||||
batch_start_dt: datetime.datetime,
|
||||
batch_end_dt: datetime.datetime,
|
||||
weight: int | float = 1,
|
||||
) -> None:
|
||||
"""Evaluate SLOs for every telemetry item belonging to a scenario window,
|
||||
store the result and enrich the telemetry list with a compact resiliency breakdown.
|
||||
|
||||
Args:
|
||||
scenario_telemetries: Iterable with telemetry objects/dicts for the
|
||||
current scenario batch window.
|
||||
prom_cli: Pre-configured :class:`KrknPrometheus` instance.
|
||||
scenario_type: Fallback scenario identifier in case individual
|
||||
telemetry items do not provide one.
|
||||
batch_start_dt: Fallback start timestamp for the batch window.
|
||||
batch_end_dt: Fallback end timestamp for the batch window.
|
||||
weight: Weight to assign to every scenario when calculating the final
|
||||
weighted average.
|
||||
logger: Optional custom logger.
|
||||
"""
|
||||
|
||||
for tel in scenario_telemetries:
|
||||
try:
|
||||
# -------- Extract timestamps & scenario name --------------------
|
||||
if isinstance(tel, dict):
|
||||
st_ts = tel.get("start_timestamp")
|
||||
en_ts = tel.get("end_timestamp")
|
||||
scen_name = tel.get("scenario", scenario_type)
|
||||
else:
|
||||
st_ts = getattr(tel, "start_timestamp", None)
|
||||
en_ts = getattr(tel, "end_timestamp", None)
|
||||
scen_name = getattr(tel, "scenario", scenario_type)
|
||||
|
||||
if st_ts and en_ts:
|
||||
st_dt = datetime.datetime.fromtimestamp(int(st_ts))
|
||||
en_dt = datetime.datetime.fromtimestamp(int(en_ts))
|
||||
else:
|
||||
st_dt = batch_start_dt
|
||||
en_dt = batch_end_dt
|
||||
|
||||
# -------- Calculate resiliency score for the scenario -----------
|
||||
self.add_scenario_report(
|
||||
scenario_name=str(scen_name),
|
||||
prom_cli=prom_cli,
|
||||
start_time=st_dt,
|
||||
end_time=en_dt,
|
||||
weight=weight,
|
||||
health_check_results=None,
|
||||
)
|
||||
|
||||
compact = self.compact_breakdown(self.scenario_reports[-1])
|
||||
if isinstance(tel, dict):
|
||||
tel["resiliency_report"] = compact
|
||||
else:
|
||||
setattr(tel, "resiliency_report", compact)
|
||||
except Exception as exc:
|
||||
logging.error("Resiliency per-scenario evaluation failed: %s", exc)
|
||||
|
||||
def finalize_and_save(
|
||||
self,
|
||||
*,
|
||||
prom_cli: KrknPrometheus,
|
||||
total_start_time: datetime.datetime,
|
||||
total_end_time: datetime.datetime,
|
||||
run_mode: str = "standalone",
|
||||
detailed_path: str = "resiliency-report.json",
|
||||
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
||||
"""Finalize resiliency scoring, persist reports and return them.
|
||||
|
||||
Args:
|
||||
prom_cli: Pre-configured KrknPrometheus instance.
|
||||
total_start_time: Start time for the full test window.
|
||||
total_end_time: End time for the full test window.
|
||||
run_mode: "controller" or "standalone" mode.
|
||||
|
||||
Returns:
|
||||
(detailed_report)
|
||||
"""
|
||||
|
||||
try:
|
||||
self.finalize_report(
|
||||
prom_cli=prom_cli,
|
||||
total_start_time=total_start_time,
|
||||
total_end_time=total_end_time,
|
||||
)
|
||||
detailed = self.get_detailed_report()
|
||||
|
||||
if run_mode == "controller":
|
||||
# krknctl expects the detailed report on stdout in a special format
|
||||
try:
|
||||
detailed_json = json.dumps(detailed)
|
||||
print(f"KRKN_RESILIENCY_REPORT_JSON:{detailed_json}")
|
||||
logging.info("Resiliency report logged to stdout for krknctl.")
|
||||
except Exception as exc:
|
||||
logging.error("Failed to serialize and log detailed resiliency report: %s", exc)
|
||||
else:
|
||||
# Stand-alone mode – write to files for post-run consumption
|
||||
try:
|
||||
with open(detailed_path, "w", encoding="utf-8") as fp:
|
||||
json.dump(detailed, fp, indent=2)
|
||||
logging.info("Resiliency report written: %s", detailed_path)
|
||||
except Exception as io_exc:
|
||||
logging.error("Failed to write resiliency report files: %s", io_exc)
|
||||
|
||||
except Exception as exc:
|
||||
logging.error("Failed to finalize resiliency scoring: %s", exc)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
@staticmethod
|
||||
def _normalise_alerts(raw_alerts: Any) -> List[Dict[str, Any]]:
|
||||
"""Convert raw YAML alerts data into internal SLO list structure."""
|
||||
if not isinstance(raw_alerts, list):
|
||||
raise ValueError("SLO configuration must be a list under key 'slos' or top-level list")
|
||||
|
||||
slos: List[Dict[str, Any]] = []
|
||||
for idx, alert in enumerate(raw_alerts):
|
||||
if not (isinstance(alert, dict) and "expr" in alert and "severity" in alert):
|
||||
logging.warning("Skipping invalid alert entry at index %d: %s", idx, alert)
|
||||
continue
|
||||
name = alert.get("description") or f"slo_{idx}"
|
||||
slos.append(
|
||||
{
|
||||
"name": name,
|
||||
"expr": alert["expr"],
|
||||
"severity": str(alert["severity"]).lower(),
|
||||
"weight": alert.get("weight")
|
||||
}
|
||||
)
|
||||
return slos
|
||||
@@ -1,76 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
DEFAULT_WEIGHTS = {"critical": 3, "warning": 1}
|
||||
|
||||
|
||||
class SLOResult:
|
||||
"""Simple container representing evaluation outcome for a single SLO."""
|
||||
|
||||
def __init__(self, name: str, severity: str, passed: bool, weight: int | None = None):
|
||||
self.name = name
|
||||
self.severity = severity
|
||||
self.passed = passed
|
||||
self._custom_weight = weight
|
||||
|
||||
def weight(self, severity_weights: Dict[str, int]) -> int:
|
||||
"""Return the weight for this SLO. Uses custom weight if set, otherwise uses severity-based weight."""
|
||||
if self._custom_weight is not None:
|
||||
return self._custom_weight
|
||||
return severity_weights.get(self.severity, severity_weights.get("warning", 1))
|
||||
|
||||
|
||||
def calculate_resiliency_score(
|
||||
slo_definitions: Dict[str, str] | Dict[str, Dict[str, int | str | None]],
|
||||
prometheus_results: Dict[str, bool],
|
||||
health_check_results: Dict[str, bool],
|
||||
) -> Tuple[int, Dict[str, int]]:
|
||||
"""Compute a resiliency score between 0-100 based on SLO pass/fail results.
|
||||
|
||||
Args:
|
||||
slo_definitions: Mapping of SLO name -> severity ("critical" | "warning") OR
|
||||
SLO name -> {"severity": str, "weight": int | None}.
|
||||
prometheus_results: Mapping of SLO name -> bool indicating whether the SLO
|
||||
passed. Any SLO missing in this mapping is treated as failed.
|
||||
health_check_results: Mapping of custom health-check name -> bool pass flag.
|
||||
These checks are always treated as *critical*.
|
||||
|
||||
Returns:
|
||||
Tuple containing (final_score, breakdown) where *breakdown* is a dict with
|
||||
the counts of passed/failed SLOs per severity.
|
||||
"""
|
||||
|
||||
slo_objects: List[SLOResult] = []
|
||||
for slo_name, slo_def in slo_definitions.items():
|
||||
# Exclude SLOs that were not evaluated (query returned no data)
|
||||
if slo_name not in prometheus_results:
|
||||
continue
|
||||
passed = bool(prometheus_results[slo_name])
|
||||
|
||||
# Support both old format (str) and new format (dict)
|
||||
if isinstance(slo_def, str):
|
||||
severity = slo_def
|
||||
slo_weight = None
|
||||
else:
|
||||
severity = slo_def.get("severity", "warning")
|
||||
slo_weight = slo_def.get("weight")
|
||||
|
||||
slo_objects.append(SLOResult(slo_name, severity, passed, weight=slo_weight))
|
||||
|
||||
# Health-check SLOs (by default keeping them critical)
|
||||
for hc_name, hc_passed in health_check_results.items():
|
||||
slo_objects.append(SLOResult(hc_name, "critical", bool(hc_passed)))
|
||||
|
||||
total_points = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects)
|
||||
points_lost = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects if not slo.passed)
|
||||
|
||||
score = 0 if total_points == 0 else int(((total_points - points_lost) / total_points) * 100)
|
||||
|
||||
breakdown = {
|
||||
"total_points": total_points,
|
||||
"points_lost": points_lost,
|
||||
"passed": len([s for s in slo_objects if s.passed]),
|
||||
"failed": len([s for s in slo_objects if not s.passed]),
|
||||
}
|
||||
return score, breakdown
|
||||
@@ -12,7 +12,7 @@ import uuid
|
||||
import time
|
||||
import queue
|
||||
import threading
|
||||
from typing import Optional, Dict
|
||||
from typing import Optional
|
||||
|
||||
from krkn import cerberus
|
||||
from krkn_lib.elastic.krkn_elastic import KrknElastic
|
||||
@@ -21,15 +21,11 @@ from krkn_lib.models.krkn import ChaosRunOutput, ChaosRunAlertSummary
|
||||
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
|
||||
import krkn.prometheus as prometheus_plugin
|
||||
import server as server
|
||||
from krkn.resiliency.resiliency import (
|
||||
Resiliency
|
||||
)
|
||||
from krkn_lib.k8s import KrknKubernetes
|
||||
from krkn_lib.ocp import KrknOpenshift
|
||||
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
from krkn_lib.models.telemetry import ChaosRunTelemetry
|
||||
from krkn_lib.models.k8s import ResiliencyReport
|
||||
from krkn_lib.utils import SafeLogger
|
||||
from krkn_lib.utils.functions import get_yaml_item_value, get_junit_test_case
|
||||
|
||||
@@ -58,8 +54,6 @@ def main(options, command: Optional[str]) -> int:
|
||||
print(pyfiglet.figlet_format("kraken"))
|
||||
logging.info("Starting kraken")
|
||||
|
||||
|
||||
|
||||
cfg = options.cfg
|
||||
# Parse and read the config
|
||||
if os.path.isfile(cfg):
|
||||
@@ -71,7 +65,6 @@ def main(options, command: Optional[str]) -> int:
|
||||
get_yaml_item_value(config["kraken"], "kubeconfig_path", "")
|
||||
)
|
||||
kraken_config = cfg
|
||||
|
||||
chaos_scenarios = get_yaml_item_value(config["kraken"], "chaos_scenarios", [])
|
||||
publish_running_status = get_yaml_item_value(
|
||||
config["kraken"], "publish_kraken_status", False
|
||||
@@ -93,20 +86,14 @@ def main(options, command: Optional[str]) -> int:
|
||||
config["kraken"], "signal_address", "0.0.0.0"
|
||||
)
|
||||
run_signal = get_yaml_item_value(config["kraken"], "signal_state", "RUN")
|
||||
|
||||
resiliency_config = get_yaml_item_value(config,"resiliency",{})
|
||||
# Determine execution mode (standalone, controller, or disabled)
|
||||
run_mode = get_yaml_item_value(resiliency_config, "resiliency_run_mode", "standalone")
|
||||
valid_run_modes = {"standalone", "detailed", "disabled"}
|
||||
if run_mode not in valid_run_modes:
|
||||
logging.warning("Unknown resiliency_run_mode '%s'. Defaulting to 'standalone'", run_mode)
|
||||
run_mode = "standalone"
|
||||
wait_duration = get_yaml_item_value(config["tunings"], "wait_duration", 60)
|
||||
iterations = get_yaml_item_value(config["tunings"], "iterations", 1)
|
||||
daemon_mode = get_yaml_item_value(config["tunings"], "daemon_mode", False)
|
||||
|
||||
prometheus_url = config["performance_monitoring"].get("prometheus_url")
|
||||
prometheus_bearer_token = config["performance_monitoring"].get("prometheus_bearer_token")
|
||||
prometheus_bearer_token = config["performance_monitoring"].get(
|
||||
"prometheus_bearer_token"
|
||||
)
|
||||
run_uuid = config["performance_monitoring"].get("uuid")
|
||||
enable_alerts = get_yaml_item_value(
|
||||
config["performance_monitoring"], "enable_alerts", False
|
||||
@@ -114,10 +101,6 @@ def main(options, command: Optional[str]) -> int:
|
||||
enable_metrics = get_yaml_item_value(
|
||||
config["performance_monitoring"], "enable_metrics", False
|
||||
)
|
||||
|
||||
|
||||
# Default placeholder; will be overridden if a Prometheus URL is available
|
||||
prometheus = None
|
||||
# elastic search
|
||||
enable_elastic = get_yaml_item_value(config["elastic"], "enable_elastic", False)
|
||||
elastic_run_tag = get_yaml_item_value(config["elastic"], "run_tag", "")
|
||||
@@ -248,11 +231,6 @@ def main(options, command: Optional[str]) -> int:
|
||||
else:
|
||||
logging.info("Cluster version CRD not detected, skipping")
|
||||
|
||||
# Final check: ensure Prometheus URL is available; disable resiliency if not
|
||||
if (not prometheus_url or prometheus_url.strip() == "") and run_mode != "disabled":
|
||||
logging.warning("Prometheus URL not provided; disabling resiliency score features.")
|
||||
run_mode = "disabled"
|
||||
|
||||
# KrknTelemetry init
|
||||
telemetry_k8s = KrknTelemetryKubernetes(
|
||||
safe_logger, kubecli, config["telemetry"]
|
||||
@@ -273,18 +251,9 @@ def main(options, command: Optional[str]) -> int:
|
||||
else:
|
||||
elastic_search = None
|
||||
summary = ChaosRunAlertSummary()
|
||||
if enable_metrics or enable_alerts or check_critical_alerts or run_mode != "disabled":
|
||||
if enable_metrics or enable_alerts or check_critical_alerts:
|
||||
prometheus = KrknPrometheus(prometheus_url, prometheus_bearer_token)
|
||||
# Quick connectivity probe for Prometheus – disable resiliency if unreachable
|
||||
try:
|
||||
prometheus.process_prom_query_in_range(
|
||||
"up", datetime.datetime.utcnow() - datetime.timedelta(seconds=60), datetime.datetime.utcnow(), granularity=60
|
||||
)
|
||||
except Exception as prom_exc:
|
||||
logging.error("Prometheus connectivity test failed: %s. Disabling resiliency features as Prometheus is required for SLO evaluation.", prom_exc)
|
||||
run_mode = "disabled"
|
||||
resiliency_alerts = get_yaml_item_value(resiliency_config, "resiliency_file", get_yaml_item_value(config['performance_monitoring'],"alert_profile", "config/alerts.yaml"))
|
||||
resiliency_obj = Resiliency(resiliency_alerts) if run_mode != "disabled" else None # Initialize resiliency orchestrator
|
||||
|
||||
logging.info("Server URL: %s" % kubecli.get_host())
|
||||
|
||||
if command == "list-rollback":
|
||||
@@ -400,8 +369,6 @@ def main(options, command: Optional[str]) -> int:
|
||||
)
|
||||
sys.exit(-1)
|
||||
|
||||
|
||||
batch_window_start_dt = datetime.datetime.utcnow()
|
||||
failed_scenarios_current, scenario_telemetries = (
|
||||
scenario_plugin.run_scenarios(
|
||||
run_uuid, scenarios_list, config, telemetry_ocp
|
||||
@@ -409,15 +376,6 @@ def main(options, command: Optional[str]) -> int:
|
||||
)
|
||||
failed_post_scenarios.extend(failed_scenarios_current)
|
||||
chaos_telemetry.scenarios.extend(scenario_telemetries)
|
||||
batch_window_end_dt = datetime.datetime.utcnow()
|
||||
if resiliency_obj:
|
||||
resiliency_obj.add_scenario_reports(
|
||||
scenario_telemetries=scenario_telemetries,
|
||||
prom_cli=prometheus,
|
||||
scenario_type=scenario_type,
|
||||
batch_start_dt=batch_window_start_dt,
|
||||
batch_end_dt=batch_window_end_dt,
|
||||
)
|
||||
|
||||
post_critical_alerts = 0
|
||||
if check_critical_alerts:
|
||||
@@ -482,41 +440,12 @@ def main(options, command: Optional[str]) -> int:
|
||||
else:
|
||||
logging.info("No error logs collected during chaos run")
|
||||
chaos_telemetry.error_logs = []
|
||||
if resiliency_obj:
|
||||
try:
|
||||
resiliency_obj.attach_compact_to_telemetry(chaos_telemetry)
|
||||
except Exception as exc:
|
||||
logging.error("Failed to embed per-scenario resiliency in telemetry: %s", exc)
|
||||
|
||||
if resiliency_obj:
|
||||
try:
|
||||
resiliency_obj.finalize_and_save(
|
||||
prom_cli=prometheus,
|
||||
total_start_time=datetime.datetime.fromtimestamp(start_time),
|
||||
total_end_time=datetime.datetime.fromtimestamp(end_time),
|
||||
run_mode=run_mode,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logging.error("Failed to finalize resiliency scoring: %s", e)
|
||||
|
||||
|
||||
telemetry_json = chaos_telemetry.to_json()
|
||||
decoded_chaos_run_telemetry = ChaosRunTelemetry(json.loads(telemetry_json))
|
||||
if resiliency_obj and hasattr(resiliency_obj, "summary") and resiliency_obj.summary is not None:
|
||||
summary_dict = resiliency_obj.get_summary()
|
||||
decoded_chaos_run_telemetry.overall_resiliency_report = ResiliencyReport(
|
||||
json_object=summary_dict,
|
||||
resiliency_score=summary_dict.get("resiliency_score", 0),
|
||||
passed_slos=summary_dict.get("passed_slos", 0),
|
||||
total_slos=summary_dict.get("total_slos", 0)
|
||||
)
|
||||
chaos_output.telemetry = decoded_chaos_run_telemetry
|
||||
logging.info(f"Chaos data:\n{chaos_output.to_json()}")
|
||||
if enable_elastic:
|
||||
elastic_telemetry = ElasticChaosRunTelemetry(
|
||||
chaos_run_telemetry=decoded_chaos_run_telemetry
|
||||
)
|
||||
result = elastic_search.push_telemetry(
|
||||
decoded_chaos_run_telemetry, elastic_telemetry_index
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user