mirror of
https://github.com/krkn-chaos/krkn.git
synced 2026-03-21 02:47:06 +00:00
Compare commits
5 Commits
custom_wei
...
v5.0.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb368a2f5c | ||
|
|
bb636cd3a9 | ||
|
|
f241b2b62f | ||
|
|
2a60a519cd | ||
|
|
31756e6d9b |
58
.github/workflows/require-docs.yml
vendored
58
.github/workflows/require-docs.yml
vendored
@@ -9,37 +9,47 @@ jobs:
|
||||
name: Check Documentation Update
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Check if Documentation is Required
|
||||
id: check_docs
|
||||
run: |
|
||||
echo "Checking PR body for documentation checkbox..."
|
||||
# Read the PR body from the GitHub event payload
|
||||
if echo "${{ github.event.pull_request.body }}" | grep -qi '\[x\].*documentation needed'; then
|
||||
# Read PR body from the event JSON file — never from shell interpolation.
|
||||
# jq handles all escaping; the shell never sees the user-controlled string.
|
||||
if jq -r '.pull_request.body // ""' "$GITHUB_EVENT_PATH" | \
|
||||
grep -qi '\[x\].*documentation needed'; then
|
||||
echo "Documentation required detected."
|
||||
echo "docs_required=true" >> $GITHUB_OUTPUT
|
||||
echo "docs_required=true" >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
echo "Documentation not required."
|
||||
echo "docs_required=false" >> $GITHUB_OUTPUT
|
||||
echo "docs_required=false" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
- name: Enforce Documentation Update (if required)
|
||||
if: steps.check_docs.outputs.docs_required == 'true'
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
# Retrieve feature branch and repository owner from the GitHub context
|
||||
FEATURE_BRANCH="${{ github.head_ref }}"
|
||||
REPO_OWNER="${{ github.repository_owner }}"
|
||||
WEBSITE_REPO="website"
|
||||
echo "Searching for a merged documentation PR for feature branch: $FEATURE_BRANCH in $REPO_OWNER/$WEBSITE_REPO..."
|
||||
MERGED_PR=$(gh pr list --repo "$REPO_OWNER/$WEBSITE_REPO" --state merged --json headRefName,title,url | jq -r \
|
||||
--arg FEATURE_BRANCH "$FEATURE_BRANCH" '.[] | select(.title | contains($FEATURE_BRANCH)) | .url')
|
||||
if [[ -z "$MERGED_PR" ]]; then
|
||||
echo ":x: Documentation PR for branch '$FEATURE_BRANCH' is required and has not been merged."
|
||||
exit 1
|
||||
else
|
||||
echo ":white_check_mark: Found merged documentation PR: $MERGED_PR"
|
||||
fi
|
||||
uses: actions/github-script@v7
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
const featureBranch = context.payload.pull_request.head.ref;
|
||||
const repoOwner = context.repo.owner;
|
||||
const websiteRepo = 'website';
|
||||
|
||||
core.info(`Searching for a merged documentation PR for feature branch: ${featureBranch} in ${repoOwner}/${websiteRepo}...`);
|
||||
|
||||
const { data: pulls } = await github.rest.pulls.list({
|
||||
owner: repoOwner,
|
||||
repo: websiteRepo,
|
||||
state: 'closed',
|
||||
per_page: 100,
|
||||
});
|
||||
|
||||
const mergedPr = pulls.find(
|
||||
(pr) => pr.merged_at && pr.title.includes(featureBranch)
|
||||
);
|
||||
|
||||
if (!mergedPr) {
|
||||
core.setFailed(
|
||||
`❌ Documentation PR for branch '${featureBranch}' is required and has not been merged.`
|
||||
);
|
||||
} else {
|
||||
core.info(`✅ Found merged documentation PR: ${mergedPr.html_url}`);
|
||||
}
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -17,6 +17,7 @@ __pycache__/*
|
||||
kube-burner*
|
||||
kube_burner*
|
||||
recommender_*.json
|
||||
resiliency*.json
|
||||
|
||||
# Project files
|
||||
.ropeproject
|
||||
|
||||
141
BETA_FEATURE_POLICY.md
Normal file
141
BETA_FEATURE_POLICY.md
Normal file
@@ -0,0 +1,141 @@
|
||||
# Beta Features Policy
|
||||
|
||||
## Overview
|
||||
|
||||
Beta features provide users early access to new capabilities before they reach full stability and general availability (GA). These features allow maintainers to gather feedback, validate usability, and improve functionality based on real-world usage.
|
||||
|
||||
Beta features are intended for experimentation and evaluation. While they are functional, they may not yet meet the stability, performance, or backward compatibility guarantees expected from generally available features.
|
||||
|
||||
---
|
||||
|
||||
## What is a Beta Feature
|
||||
|
||||
A **Beta feature** is a feature that is released for user evaluation but is still under active development and refinement.
|
||||
|
||||
Beta features may have the following characteristics:
|
||||
|
||||
- Functionally usable but still evolving
|
||||
- APIs or behavior may change between releases
|
||||
- Performance optimizations may still be in progress
|
||||
- Documentation may be limited or evolving
|
||||
- Edge cases may not be fully validated
|
||||
|
||||
Beta features should be considered **experimental and optional**.
|
||||
|
||||
---
|
||||
|
||||
## User Expectations
|
||||
|
||||
Users trying Beta features should understand the following:
|
||||
|
||||
- Stability is not guaranteed
|
||||
- APIs and functionality may change without notice
|
||||
- Backward compatibility is not guaranteed
|
||||
- The feature may evolve significantly before GA
|
||||
- Production use should be evaluated carefully
|
||||
|
||||
We strongly encourage users to provide feedback to help improve the feature before it becomes generally available.
|
||||
|
||||
---
|
||||
|
||||
## Beta Feature Identification
|
||||
|
||||
All Beta features are clearly identified to ensure transparency.
|
||||
|
||||
### In Release Notes
|
||||
|
||||
Beta features will be marked with a **[BETA]** tag.
|
||||
|
||||
Example: [BETA] Krkn Resilience Score
|
||||
|
||||
|
||||
### In Documentation
|
||||
|
||||
Beta features will include a notice similar to:
|
||||
|
||||
> **Beta Feature**
|
||||
> This feature is currently in Beta and is intended for early user feedback. Behavior, APIs, and stability may change in future releases.
|
||||
|
||||
---
|
||||
|
||||
## Feature Lifecycle
|
||||
|
||||
Features typically progress through the following lifecycle stages.
|
||||
|
||||
### 1. Development
|
||||
The feature is under active development and may not yet be visible to users.
|
||||
|
||||
### 2. Beta
|
||||
The feature is released for early adoption and feedback.
|
||||
|
||||
Characteristics:
|
||||
|
||||
- Feature is usable
|
||||
- Feedback is encouraged
|
||||
- Stability improvements are ongoing
|
||||
|
||||
### 3. Stabilization
|
||||
Based on user feedback and testing, the feature is improved to meet stability and usability expectations.
|
||||
|
||||
### 4. General Availability (GA)
|
||||
|
||||
The feature is considered stable and production-ready.
|
||||
|
||||
GA features provide:
|
||||
|
||||
- Stable APIs
|
||||
- Backward compatibility guarantees
|
||||
- Complete documentation
|
||||
- Full CI test coverage
|
||||
|
||||
---
|
||||
|
||||
## Promotion to General Availability
|
||||
|
||||
A Beta feature may be promoted to GA once the following criteria are met:
|
||||
|
||||
- Critical bugs are resolved
|
||||
- Feature stability has improved through testing
|
||||
- APIs and behavior are stable
|
||||
- Documentation is complete
|
||||
- Community feedback has been incorporated
|
||||
|
||||
The promotion will be announced in the release notes.
|
||||
|
||||
Example: Feature promoted from Beta to GA
|
||||
|
||||
|
||||
---
|
||||
|
||||
## Deprecation of Beta Features
|
||||
|
||||
In some cases, a Beta feature may be redesigned or discontinued.
|
||||
|
||||
If this happens:
|
||||
|
||||
- The feature will be marked as **Deprecated**
|
||||
- A removal timeline will be provided
|
||||
- Alternative approaches will be documented when possible
|
||||
|
||||
Example: [DEPRECATED] This feature will be removed in a future release.
|
||||
|
||||
---
|
||||
|
||||
## Contributing Feedback
|
||||
User feedback plays a critical role in improving Beta features.
|
||||
|
||||
Users are encouraged to report:
|
||||
|
||||
- Bugs
|
||||
- Usability issues
|
||||
- Performance concerns
|
||||
- Feature suggestions
|
||||
|
||||
Feedback can be submitted through:
|
||||
|
||||
- Krkn GitHub Issues
|
||||
- Krkn GitHub Discussions
|
||||
- Krkn Community channels
|
||||
|
||||
Please include **Beta feature context** when reporting issues.
|
||||
Your feedback helps guide the roadmap and ensures features are production-ready before GA.
|
||||
@@ -55,6 +55,10 @@ kraken:
|
||||
- kubevirt_vm_outage:
|
||||
- scenarios/kubevirt/kubevirt-vm-outage.yaml
|
||||
|
||||
resiliency:
|
||||
resiliency_run_mode: standalone # Options: standalone, controller, disabled
|
||||
resiliency_file: config/alerts.yaml # Path to SLO definitions, will resolve to performance_monitoring: alert_profile: if not specified
|
||||
|
||||
cerberus:
|
||||
cerberus_enabled: False # Enable it when cerberus is previously installed
|
||||
cerberus_url: # When cerberus_enabled is set to True, provide the url where cerberus publishes go/no-go signal
|
||||
@@ -131,4 +135,5 @@ kubevirt_checks: # Utilizing virt che
|
||||
disconnected: False # Boolean of how to try to connect to the VMIs; if True will use the ip_address to try ssh from within a node, if false will use the name and uses virtctl to try to connect; Default is False
|
||||
ssh_node: "" # If set, will be a backup way to ssh to a node. Will want to set to a node that isn't targeted in chaos
|
||||
node_names: ""
|
||||
exit_on_failure: # If value is True and VMI's are failing post chaos returns failure, values can be True/False
|
||||
exit_on_failure: # If value is True and VMI's are failing post chaos returns failure, values can be True/False
|
||||
|
||||
|
||||
@@ -163,6 +163,15 @@
|
||||
"default": "False",
|
||||
"required": "false"
|
||||
},
|
||||
{
|
||||
"name": "es-run-tag",
|
||||
"short_description": "Elasticsearch run tag",
|
||||
"description": "Elasticsearch run tag to compare similar runs",
|
||||
"variable": "ES_RUN_TAG",
|
||||
"type": "string",
|
||||
"default": "",
|
||||
"required": "false"
|
||||
},
|
||||
{
|
||||
"name": "es-server",
|
||||
"short_description": "Elasticsearch instance URL",
|
||||
|
||||
79
krkn/prometheus/collector.py
Normal file
79
krkn/prometheus/collector.py
Normal file
@@ -0,0 +1,79 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
from typing import Dict, Any, List, Optional
|
||||
|
||||
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# SLO evaluation helpers (used by krkn.resiliency)
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
def slo_passed(prometheus_result: List[Any]) -> Optional[bool]:
|
||||
if not prometheus_result:
|
||||
return None
|
||||
has_samples = False
|
||||
for series in prometheus_result:
|
||||
if "values" in series:
|
||||
has_samples = True
|
||||
for _ts, val in series["values"]:
|
||||
try:
|
||||
if float(val) > 0:
|
||||
return False
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
elif "value" in series:
|
||||
has_samples = True
|
||||
try:
|
||||
return float(series["value"][1]) == 0
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
|
||||
# If we reached here and never saw any samples, skip
|
||||
return None if not has_samples else True
|
||||
|
||||
|
||||
def evaluate_slos(
|
||||
prom_cli: KrknPrometheus,
|
||||
slo_list: List[Dict[str, Any]],
|
||||
start_time: datetime.datetime,
|
||||
end_time: datetime.datetime,
|
||||
) -> Dict[str, bool]:
|
||||
"""Evaluate a list of SLO expressions against Prometheus.
|
||||
|
||||
Args:
|
||||
prom_cli: Configured Prometheus client.
|
||||
slo_list: List of dicts with keys ``name``, ``expr``.
|
||||
start_time: Start timestamp.
|
||||
end_time: End timestamp.
|
||||
granularity: Step in seconds for range queries.
|
||||
Returns:
|
||||
Mapping name -> bool indicating pass status.
|
||||
True means good we passed the SLO test otherwise failed the SLO
|
||||
"""
|
||||
results: Dict[str, bool] = {}
|
||||
logging.info("Evaluating %d SLOs over window %s – %s", len(slo_list), start_time, end_time)
|
||||
for slo in slo_list:
|
||||
expr = slo["expr"]
|
||||
name = slo["name"]
|
||||
try:
|
||||
response = prom_cli.process_prom_query_in_range(
|
||||
expr,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
)
|
||||
|
||||
passed = slo_passed(response)
|
||||
if passed is None:
|
||||
# Absence of data indicates the condition did not trigger; treat as pass.
|
||||
logging.debug("SLO '%s' query returned no data; assuming pass.", name)
|
||||
results[name] = True
|
||||
else:
|
||||
results[name] = passed
|
||||
except Exception as exc:
|
||||
logging.error("PromQL query failed for SLO '%s': %s", name, exc)
|
||||
results[name] = False
|
||||
return results
|
||||
4
krkn/resiliency/__init__.py
Normal file
4
krkn/resiliency/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
"""krkn.resiliency package public interface."""
|
||||
|
||||
from .resiliency import Resiliency # noqa: F401
|
||||
from .score import calculate_resiliency_score # noqa: F401
|
||||
366
krkn/resiliency/resiliency.py
Normal file
366
krkn/resiliency/resiliency.py
Normal file
@@ -0,0 +1,366 @@
|
||||
"""Resiliency evaluation orchestrator for Krkn chaos runs.
|
||||
|
||||
This module provides the `Resiliency` class which loads the canonical
|
||||
`alerts.yaml`, executes every SLO expression against Prometheus in the
|
||||
chaos-test time window, determines pass/fail status and calculates an
|
||||
overall resiliency score using the generic weighted model implemented
|
||||
in `krkn.resiliency.score`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
from typing import Dict, List, Any, Optional, Tuple
|
||||
|
||||
import yaml
|
||||
import json
|
||||
import dataclasses
|
||||
from krkn_lib.models.telemetry import ChaosRunTelemetry
|
||||
|
||||
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
|
||||
from krkn.prometheus.collector import evaluate_slos
|
||||
from krkn.resiliency.score import calculate_resiliency_score
|
||||
|
||||
|
||||
class Resiliency:
|
||||
"""Central orchestrator for resiliency scoring."""
|
||||
|
||||
def __init__(self, alerts_yaml_path: str):
|
||||
|
||||
if not os.path.exists(alerts_yaml_path):
|
||||
raise FileNotFoundError(f"alerts file not found: {alerts_yaml_path}")
|
||||
with open(alerts_yaml_path, "r", encoding="utf-8") as fp:
|
||||
raw_yaml_data = yaml.safe_load(fp)
|
||||
logging.info("Loaded SLO configuration from %s", alerts_yaml_path)
|
||||
|
||||
self._slos = self._normalise_alerts(raw_yaml_data)
|
||||
self._results: Dict[str, bool] = {}
|
||||
self._score: Optional[int] = None
|
||||
self._breakdown: Optional[Dict[str, int]] = None
|
||||
self._health_check_results: Dict[str, bool] = {}
|
||||
self.scenario_reports: List[Dict[str, Any]] = []
|
||||
self.summary: Optional[Dict[str, Any]] = None
|
||||
self.detailed_report: Optional[Dict[str, Any]] = None
|
||||
|
||||
# ---------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------
|
||||
|
||||
def calculate_score(
|
||||
self,
|
||||
*,
|
||||
health_check_results: Optional[Dict[str, bool]] = None,
|
||||
) -> int:
|
||||
"""Calculate the resiliency score using collected SLO results."""
|
||||
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions=slo_defs,
|
||||
prometheus_results=self._results,
|
||||
health_check_results=health_check_results or {},
|
||||
)
|
||||
self._score = score
|
||||
self._breakdown = breakdown
|
||||
self._health_check_results = health_check_results or {}
|
||||
return score
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Return a dictionary ready for telemetry output."""
|
||||
if self._score is None:
|
||||
raise RuntimeError("calculate_score() must be called before to_dict()")
|
||||
return {
|
||||
"score": self._score,
|
||||
"breakdown": self._breakdown,
|
||||
"slo_results": self._results,
|
||||
"health_check_results": getattr(self, "_health_check_results", {}),
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Scenario-based resiliency evaluation
|
||||
# ------------------------------------------------------------------
|
||||
def add_scenario_report(
|
||||
self,
|
||||
*,
|
||||
scenario_name: str,
|
||||
prom_cli: KrknPrometheus,
|
||||
start_time: datetime.datetime,
|
||||
end_time: datetime.datetime,
|
||||
weight: float | int = 1,
|
||||
health_check_results: Optional[Dict[str, bool]] = None,
|
||||
) -> int:
|
||||
"""
|
||||
Evaluate SLOs for a single scenario window and store the result.
|
||||
|
||||
Args:
|
||||
scenario_name: Human-friendly scenario identifier.
|
||||
prom_cli: Initialized KrknPrometheus instance.
|
||||
start_time: Window start.
|
||||
end_time: Window end.
|
||||
weight: Weight to use for the final weighted average calculation.
|
||||
health_check_results: Optional mapping of custom health-check name ➡ bool.
|
||||
Returns:
|
||||
The calculated integer resiliency score (0-100) for this scenario.
|
||||
"""
|
||||
slo_results = evaluate_slos(
|
||||
prom_cli=prom_cli,
|
||||
slo_list=self._slos,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
)
|
||||
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions=slo_defs,
|
||||
prometheus_results=slo_results,
|
||||
health_check_results=health_check_results or {},
|
||||
)
|
||||
self.scenario_reports.append(
|
||||
{
|
||||
"name": scenario_name,
|
||||
"window": {
|
||||
"start": start_time.isoformat(),
|
||||
"end": end_time.isoformat(),
|
||||
},
|
||||
"score": score,
|
||||
"weight": weight,
|
||||
"breakdown": breakdown,
|
||||
"slo_results": slo_results,
|
||||
"health_check_results": health_check_results or {},
|
||||
}
|
||||
)
|
||||
return score
|
||||
|
||||
def finalize_report(
|
||||
self,
|
||||
*,
|
||||
prom_cli: KrknPrometheus,
|
||||
total_start_time: datetime.datetime,
|
||||
total_end_time: datetime.datetime,
|
||||
) -> None:
|
||||
if not self.scenario_reports:
|
||||
raise RuntimeError("No scenario reports added – nothing to finalize")
|
||||
|
||||
# ---------------- Weighted average (primary resiliency_score) ----------
|
||||
total_weight = sum(rep["weight"] for rep in self.scenario_reports)
|
||||
resiliency_score = int(
|
||||
sum(rep["score"] * rep["weight"] for rep in self.scenario_reports) / total_weight
|
||||
)
|
||||
|
||||
# ---------------- Overall SLO evaluation across full test window -----------------------------
|
||||
full_slo_results = evaluate_slos(
|
||||
prom_cli=prom_cli,
|
||||
slo_list=self._slos,
|
||||
start_time=total_start_time,
|
||||
end_time=total_end_time,
|
||||
)
|
||||
slo_defs = {slo["name"]: {"severity": slo["severity"], "weight": slo.get("weight")} for slo in self._slos}
|
||||
_overall_score, full_breakdown = calculate_resiliency_score(
|
||||
slo_definitions=slo_defs,
|
||||
prometheus_results=full_slo_results,
|
||||
health_check_results={},
|
||||
)
|
||||
|
||||
self.summary = {
|
||||
"scenarios": {rep["name"]: rep["score"] for rep in self.scenario_reports},
|
||||
"resiliency_score": resiliency_score,
|
||||
"passed_slos": full_breakdown.get("passed", 0),
|
||||
"total_slos": full_breakdown.get("passed", 0) + full_breakdown.get("failed", 0),
|
||||
}
|
||||
|
||||
# Detailed report currently limited to per-scenario information; system stability section removed
|
||||
self.detailed_report = {
|
||||
"scenarios": self.scenario_reports,
|
||||
}
|
||||
|
||||
def get_summary(self) -> Dict[str, Any]:
|
||||
"""Return the concise resiliency_summary structure."""
|
||||
if not hasattr(self, "summary") or self.summary is None:
|
||||
raise RuntimeError("finalize_report() must be called first")
|
||||
return self.summary
|
||||
|
||||
def get_detailed_report(self) -> Dict[str, Any]:
|
||||
"""Return the full resiliency-report structure."""
|
||||
if not hasattr(self, "detailed_report") or self.detailed_report is None:
|
||||
raise RuntimeError("finalize_report() must be called first")
|
||||
return self.detailed_report
|
||||
|
||||
@staticmethod
|
||||
def compact_breakdown(report: Dict[str, Any]) -> Dict[str, int]:
|
||||
"""Return a compact summary dict for a single scenario report."""
|
||||
try:
|
||||
passed = report["breakdown"]["passed"]
|
||||
failed = report["breakdown"]["failed"]
|
||||
score_val = report["score"]
|
||||
except Exception:
|
||||
passed = report.get("breakdown", {}).get("passed", 0)
|
||||
failed = report.get("breakdown", {}).get("failed", 0)
|
||||
score_val = report.get("score", 0)
|
||||
return {
|
||||
"resiliency_score": score_val,
|
||||
"passed_slos": passed,
|
||||
"total_slos": passed + failed,
|
||||
}
|
||||
|
||||
def attach_compact_to_telemetry(self, chaos_telemetry: ChaosRunTelemetry) -> None:
|
||||
"""Embed per-scenario compact resiliency reports into a ChaosRunTelemetry instance."""
|
||||
score_map = {
|
||||
rep["name"]: self.compact_breakdown(rep) for rep in self.scenario_reports
|
||||
}
|
||||
new_scenarios = []
|
||||
for item in getattr(chaos_telemetry, "scenarios", []):
|
||||
if isinstance(item, dict):
|
||||
name = item.get("scenario")
|
||||
if name in score_map:
|
||||
item["resiliency_report"] = score_map[name]
|
||||
new_scenarios.append(item)
|
||||
else:
|
||||
name = getattr(item, "scenario", None)
|
||||
try:
|
||||
item_dict = dataclasses.asdict(item)
|
||||
except Exception:
|
||||
item_dict = {
|
||||
k: getattr(item, k)
|
||||
for k in dir(item)
|
||||
if not k.startswith("__") and not callable(getattr(item, k))
|
||||
}
|
||||
if name in score_map:
|
||||
item_dict["resiliency_report"] = score_map[name]
|
||||
new_scenarios.append(item_dict)
|
||||
chaos_telemetry.scenarios = new_scenarios
|
||||
|
||||
def add_scenario_reports(
|
||||
self,
|
||||
*,
|
||||
scenario_telemetries,
|
||||
prom_cli: KrknPrometheus,
|
||||
scenario_type: str,
|
||||
batch_start_dt: datetime.datetime,
|
||||
batch_end_dt: datetime.datetime,
|
||||
weight: int | float = 1,
|
||||
) -> None:
|
||||
"""Evaluate SLOs for every telemetry item belonging to a scenario window,
|
||||
store the result and enrich the telemetry list with a compact resiliency breakdown.
|
||||
|
||||
Args:
|
||||
scenario_telemetries: Iterable with telemetry objects/dicts for the
|
||||
current scenario batch window.
|
||||
prom_cli: Pre-configured :class:`KrknPrometheus` instance.
|
||||
scenario_type: Fallback scenario identifier in case individual
|
||||
telemetry items do not provide one.
|
||||
batch_start_dt: Fallback start timestamp for the batch window.
|
||||
batch_end_dt: Fallback end timestamp for the batch window.
|
||||
weight: Weight to assign to every scenario when calculating the final
|
||||
weighted average.
|
||||
logger: Optional custom logger.
|
||||
"""
|
||||
|
||||
for tel in scenario_telemetries:
|
||||
try:
|
||||
# -------- Extract timestamps & scenario name --------------------
|
||||
if isinstance(tel, dict):
|
||||
st_ts = tel.get("start_timestamp")
|
||||
en_ts = tel.get("end_timestamp")
|
||||
scen_name = tel.get("scenario", scenario_type)
|
||||
else:
|
||||
st_ts = getattr(tel, "start_timestamp", None)
|
||||
en_ts = getattr(tel, "end_timestamp", None)
|
||||
scen_name = getattr(tel, "scenario", scenario_type)
|
||||
|
||||
if st_ts and en_ts:
|
||||
st_dt = datetime.datetime.fromtimestamp(int(st_ts))
|
||||
en_dt = datetime.datetime.fromtimestamp(int(en_ts))
|
||||
else:
|
||||
st_dt = batch_start_dt
|
||||
en_dt = batch_end_dt
|
||||
|
||||
# -------- Calculate resiliency score for the scenario -----------
|
||||
self.add_scenario_report(
|
||||
scenario_name=str(scen_name),
|
||||
prom_cli=prom_cli,
|
||||
start_time=st_dt,
|
||||
end_time=en_dt,
|
||||
weight=weight,
|
||||
health_check_results=None,
|
||||
)
|
||||
|
||||
compact = self.compact_breakdown(self.scenario_reports[-1])
|
||||
if isinstance(tel, dict):
|
||||
tel["resiliency_report"] = compact
|
||||
else:
|
||||
setattr(tel, "resiliency_report", compact)
|
||||
except Exception as exc:
|
||||
logging.error("Resiliency per-scenario evaluation failed: %s", exc)
|
||||
|
||||
def finalize_and_save(
|
||||
self,
|
||||
*,
|
||||
prom_cli: KrknPrometheus,
|
||||
total_start_time: datetime.datetime,
|
||||
total_end_time: datetime.datetime,
|
||||
run_mode: str = "standalone",
|
||||
detailed_path: str = "resiliency-report.json",
|
||||
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
||||
"""Finalize resiliency scoring, persist reports and return them.
|
||||
|
||||
Args:
|
||||
prom_cli: Pre-configured KrknPrometheus instance.
|
||||
total_start_time: Start time for the full test window.
|
||||
total_end_time: End time for the full test window.
|
||||
run_mode: "controller" or "standalone" mode.
|
||||
|
||||
Returns:
|
||||
(detailed_report)
|
||||
"""
|
||||
|
||||
try:
|
||||
self.finalize_report(
|
||||
prom_cli=prom_cli,
|
||||
total_start_time=total_start_time,
|
||||
total_end_time=total_end_time,
|
||||
)
|
||||
detailed = self.get_detailed_report()
|
||||
|
||||
if run_mode == "controller":
|
||||
# krknctl expects the detailed report on stdout in a special format
|
||||
try:
|
||||
detailed_json = json.dumps(detailed)
|
||||
print(f"KRKN_RESILIENCY_REPORT_JSON:{detailed_json}")
|
||||
logging.info("Resiliency report logged to stdout for krknctl.")
|
||||
except Exception as exc:
|
||||
logging.error("Failed to serialize and log detailed resiliency report: %s", exc)
|
||||
else:
|
||||
# Stand-alone mode – write to files for post-run consumption
|
||||
try:
|
||||
with open(detailed_path, "w", encoding="utf-8") as fp:
|
||||
json.dump(detailed, fp, indent=2)
|
||||
logging.info("Resiliency report written: %s", detailed_path)
|
||||
except Exception as io_exc:
|
||||
logging.error("Failed to write resiliency report files: %s", io_exc)
|
||||
|
||||
except Exception as exc:
|
||||
logging.error("Failed to finalize resiliency scoring: %s", exc)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
@staticmethod
|
||||
def _normalise_alerts(raw_alerts: Any) -> List[Dict[str, Any]]:
|
||||
"""Convert raw YAML alerts data into internal SLO list structure."""
|
||||
if not isinstance(raw_alerts, list):
|
||||
raise ValueError("SLO configuration must be a list under key 'slos' or top-level list")
|
||||
|
||||
slos: List[Dict[str, Any]] = []
|
||||
for idx, alert in enumerate(raw_alerts):
|
||||
if not (isinstance(alert, dict) and "expr" in alert and "severity" in alert):
|
||||
logging.warning("Skipping invalid alert entry at index %d: %s", idx, alert)
|
||||
continue
|
||||
name = alert.get("description") or f"slo_{idx}"
|
||||
slos.append(
|
||||
{
|
||||
"name": name,
|
||||
"expr": alert["expr"],
|
||||
"severity": str(alert["severity"]).lower(),
|
||||
"weight": alert.get("weight")
|
||||
}
|
||||
)
|
||||
return slos
|
||||
76
krkn/resiliency/score.py
Normal file
76
krkn/resiliency/score.py
Normal file
@@ -0,0 +1,76 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
DEFAULT_WEIGHTS = {"critical": 3, "warning": 1}
|
||||
|
||||
|
||||
class SLOResult:
|
||||
"""Simple container representing evaluation outcome for a single SLO."""
|
||||
|
||||
def __init__(self, name: str, severity: str, passed: bool, weight: int | None = None):
|
||||
self.name = name
|
||||
self.severity = severity
|
||||
self.passed = passed
|
||||
self._custom_weight = weight
|
||||
|
||||
def weight(self, severity_weights: Dict[str, int]) -> int:
|
||||
"""Return the weight for this SLO. Uses custom weight if set, otherwise uses severity-based weight."""
|
||||
if self._custom_weight is not None:
|
||||
return self._custom_weight
|
||||
return severity_weights.get(self.severity, severity_weights.get("warning", 1))
|
||||
|
||||
|
||||
def calculate_resiliency_score(
|
||||
slo_definitions: Dict[str, str] | Dict[str, Dict[str, int | str | None]],
|
||||
prometheus_results: Dict[str, bool],
|
||||
health_check_results: Dict[str, bool],
|
||||
) -> Tuple[int, Dict[str, int]]:
|
||||
"""Compute a resiliency score between 0-100 based on SLO pass/fail results.
|
||||
|
||||
Args:
|
||||
slo_definitions: Mapping of SLO name -> severity ("critical" | "warning") OR
|
||||
SLO name -> {"severity": str, "weight": int | None}.
|
||||
prometheus_results: Mapping of SLO name -> bool indicating whether the SLO
|
||||
passed. Any SLO missing in this mapping is treated as failed.
|
||||
health_check_results: Mapping of custom health-check name -> bool pass flag.
|
||||
These checks are always treated as *critical*.
|
||||
|
||||
Returns:
|
||||
Tuple containing (final_score, breakdown) where *breakdown* is a dict with
|
||||
the counts of passed/failed SLOs per severity.
|
||||
"""
|
||||
|
||||
slo_objects: List[SLOResult] = []
|
||||
for slo_name, slo_def in slo_definitions.items():
|
||||
# Exclude SLOs that were not evaluated (query returned no data)
|
||||
if slo_name not in prometheus_results:
|
||||
continue
|
||||
passed = bool(prometheus_results[slo_name])
|
||||
|
||||
# Support both old format (str) and new format (dict)
|
||||
if isinstance(slo_def, str):
|
||||
severity = slo_def
|
||||
slo_weight = None
|
||||
else:
|
||||
severity = slo_def.get("severity", "warning")
|
||||
slo_weight = slo_def.get("weight")
|
||||
|
||||
slo_objects.append(SLOResult(slo_name, severity, passed, weight=slo_weight))
|
||||
|
||||
# Health-check SLOs (by default keeping them critical)
|
||||
for hc_name, hc_passed in health_check_results.items():
|
||||
slo_objects.append(SLOResult(hc_name, "critical", bool(hc_passed)))
|
||||
|
||||
total_points = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects)
|
||||
points_lost = sum(slo.weight(DEFAULT_WEIGHTS) for slo in slo_objects if not slo.passed)
|
||||
|
||||
score = 0 if total_points == 0 else int(((total_points - points_lost) / total_points) * 100)
|
||||
|
||||
breakdown = {
|
||||
"total_points": total_points,
|
||||
"points_lost": points_lost,
|
||||
"passed": len([s for s in slo_objects if s.passed]),
|
||||
"failed": len([s for s in slo_objects if not s.passed]),
|
||||
}
|
||||
return score, breakdown
|
||||
@@ -12,7 +12,7 @@ import uuid
|
||||
import time
|
||||
import queue
|
||||
import threading
|
||||
from typing import Optional
|
||||
from typing import Optional, Dict
|
||||
|
||||
from krkn import cerberus
|
||||
from krkn_lib.elastic.krkn_elastic import KrknElastic
|
||||
@@ -21,11 +21,15 @@ from krkn_lib.models.krkn import ChaosRunOutput, ChaosRunAlertSummary
|
||||
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
|
||||
import krkn.prometheus as prometheus_plugin
|
||||
import server as server
|
||||
from krkn.resiliency.resiliency import (
|
||||
Resiliency
|
||||
)
|
||||
from krkn_lib.k8s import KrknKubernetes
|
||||
from krkn_lib.ocp import KrknOpenshift
|
||||
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
from krkn_lib.models.telemetry import ChaosRunTelemetry
|
||||
from krkn_lib.models.k8s import ResiliencyReport
|
||||
from krkn_lib.utils import SafeLogger
|
||||
from krkn_lib.utils.functions import get_yaml_item_value, get_junit_test_case
|
||||
|
||||
@@ -54,6 +58,8 @@ def main(options, command: Optional[str]) -> int:
|
||||
print(pyfiglet.figlet_format("kraken"))
|
||||
logging.info("Starting kraken")
|
||||
|
||||
|
||||
|
||||
cfg = options.cfg
|
||||
# Parse and read the config
|
||||
if os.path.isfile(cfg):
|
||||
@@ -65,6 +71,7 @@ def main(options, command: Optional[str]) -> int:
|
||||
get_yaml_item_value(config["kraken"], "kubeconfig_path", "")
|
||||
)
|
||||
kraken_config = cfg
|
||||
|
||||
chaos_scenarios = get_yaml_item_value(config["kraken"], "chaos_scenarios", [])
|
||||
publish_running_status = get_yaml_item_value(
|
||||
config["kraken"], "publish_kraken_status", False
|
||||
@@ -86,14 +93,20 @@ def main(options, command: Optional[str]) -> int:
|
||||
config["kraken"], "signal_address", "0.0.0.0"
|
||||
)
|
||||
run_signal = get_yaml_item_value(config["kraken"], "signal_state", "RUN")
|
||||
|
||||
resiliency_config = get_yaml_item_value(config,"resiliency",{})
|
||||
# Determine execution mode (standalone, controller, or disabled)
|
||||
run_mode = get_yaml_item_value(resiliency_config, "resiliency_run_mode", "standalone")
|
||||
valid_run_modes = {"standalone", "detailed", "disabled"}
|
||||
if run_mode not in valid_run_modes:
|
||||
logging.warning("Unknown resiliency_run_mode '%s'. Defaulting to 'standalone'", run_mode)
|
||||
run_mode = "standalone"
|
||||
wait_duration = get_yaml_item_value(config["tunings"], "wait_duration", 60)
|
||||
iterations = get_yaml_item_value(config["tunings"], "iterations", 1)
|
||||
daemon_mode = get_yaml_item_value(config["tunings"], "daemon_mode", False)
|
||||
|
||||
prometheus_url = config["performance_monitoring"].get("prometheus_url")
|
||||
prometheus_bearer_token = config["performance_monitoring"].get(
|
||||
"prometheus_bearer_token"
|
||||
)
|
||||
prometheus_bearer_token = config["performance_monitoring"].get("prometheus_bearer_token")
|
||||
run_uuid = config["performance_monitoring"].get("uuid")
|
||||
enable_alerts = get_yaml_item_value(
|
||||
config["performance_monitoring"], "enable_alerts", False
|
||||
@@ -101,6 +114,10 @@ def main(options, command: Optional[str]) -> int:
|
||||
enable_metrics = get_yaml_item_value(
|
||||
config["performance_monitoring"], "enable_metrics", False
|
||||
)
|
||||
|
||||
|
||||
# Default placeholder; will be overridden if a Prometheus URL is available
|
||||
prometheus = None
|
||||
# elastic search
|
||||
enable_elastic = get_yaml_item_value(config["elastic"], "enable_elastic", False)
|
||||
elastic_run_tag = get_yaml_item_value(config["elastic"], "run_tag", "")
|
||||
@@ -231,6 +248,11 @@ def main(options, command: Optional[str]) -> int:
|
||||
else:
|
||||
logging.info("Cluster version CRD not detected, skipping")
|
||||
|
||||
# Final check: ensure Prometheus URL is available; disable resiliency if not
|
||||
if (not prometheus_url or prometheus_url.strip() == "") and run_mode != "disabled":
|
||||
logging.warning("Prometheus URL not provided; disabling resiliency score features.")
|
||||
run_mode = "disabled"
|
||||
|
||||
# KrknTelemetry init
|
||||
telemetry_k8s = KrknTelemetryKubernetes(
|
||||
safe_logger, kubecli, config["telemetry"]
|
||||
@@ -251,9 +273,18 @@ def main(options, command: Optional[str]) -> int:
|
||||
else:
|
||||
elastic_search = None
|
||||
summary = ChaosRunAlertSummary()
|
||||
if enable_metrics or enable_alerts or check_critical_alerts:
|
||||
if enable_metrics or enable_alerts or check_critical_alerts or run_mode != "disabled":
|
||||
prometheus = KrknPrometheus(prometheus_url, prometheus_bearer_token)
|
||||
|
||||
# Quick connectivity probe for Prometheus – disable resiliency if unreachable
|
||||
try:
|
||||
prometheus.process_prom_query_in_range(
|
||||
"up", datetime.datetime.utcnow() - datetime.timedelta(seconds=60), datetime.datetime.utcnow(), granularity=60
|
||||
)
|
||||
except Exception as prom_exc:
|
||||
logging.error("Prometheus connectivity test failed: %s. Disabling resiliency features as Prometheus is required for SLO evaluation.", prom_exc)
|
||||
run_mode = "disabled"
|
||||
resiliency_alerts = get_yaml_item_value(resiliency_config, "resiliency_file", get_yaml_item_value(config['performance_monitoring'],"alert_profile", "config/alerts.yaml"))
|
||||
resiliency_obj = Resiliency(resiliency_alerts) if run_mode != "disabled" else None # Initialize resiliency orchestrator
|
||||
logging.info("Server URL: %s" % kubecli.get_host())
|
||||
|
||||
if command == "list-rollback":
|
||||
@@ -369,6 +400,8 @@ def main(options, command: Optional[str]) -> int:
|
||||
)
|
||||
sys.exit(-1)
|
||||
|
||||
|
||||
batch_window_start_dt = datetime.datetime.utcnow()
|
||||
failed_scenarios_current, scenario_telemetries = (
|
||||
scenario_plugin.run_scenarios(
|
||||
run_uuid, scenarios_list, config, telemetry_ocp
|
||||
@@ -376,6 +409,15 @@ def main(options, command: Optional[str]) -> int:
|
||||
)
|
||||
failed_post_scenarios.extend(failed_scenarios_current)
|
||||
chaos_telemetry.scenarios.extend(scenario_telemetries)
|
||||
batch_window_end_dt = datetime.datetime.utcnow()
|
||||
if resiliency_obj:
|
||||
resiliency_obj.add_scenario_reports(
|
||||
scenario_telemetries=scenario_telemetries,
|
||||
prom_cli=prometheus,
|
||||
scenario_type=scenario_type,
|
||||
batch_start_dt=batch_window_start_dt,
|
||||
batch_end_dt=batch_window_end_dt,
|
||||
)
|
||||
|
||||
post_critical_alerts = 0
|
||||
if check_critical_alerts:
|
||||
@@ -440,12 +482,41 @@ def main(options, command: Optional[str]) -> int:
|
||||
else:
|
||||
logging.info("No error logs collected during chaos run")
|
||||
chaos_telemetry.error_logs = []
|
||||
if resiliency_obj:
|
||||
try:
|
||||
resiliency_obj.attach_compact_to_telemetry(chaos_telemetry)
|
||||
except Exception as exc:
|
||||
logging.error("Failed to embed per-scenario resiliency in telemetry: %s", exc)
|
||||
|
||||
if resiliency_obj:
|
||||
try:
|
||||
resiliency_obj.finalize_and_save(
|
||||
prom_cli=prometheus,
|
||||
total_start_time=datetime.datetime.fromtimestamp(start_time),
|
||||
total_end_time=datetime.datetime.fromtimestamp(end_time),
|
||||
run_mode=run_mode,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logging.error("Failed to finalize resiliency scoring: %s", e)
|
||||
|
||||
|
||||
telemetry_json = chaos_telemetry.to_json()
|
||||
decoded_chaos_run_telemetry = ChaosRunTelemetry(json.loads(telemetry_json))
|
||||
if resiliency_obj and hasattr(resiliency_obj, "summary") and resiliency_obj.summary is not None:
|
||||
summary_dict = resiliency_obj.get_summary()
|
||||
decoded_chaos_run_telemetry.overall_resiliency_report = ResiliencyReport(
|
||||
json_object=summary_dict,
|
||||
resiliency_score=summary_dict.get("resiliency_score", 0),
|
||||
passed_slos=summary_dict.get("passed_slos", 0),
|
||||
total_slos=summary_dict.get("total_slos", 0)
|
||||
)
|
||||
chaos_output.telemetry = decoded_chaos_run_telemetry
|
||||
logging.info(f"Chaos data:\n{chaos_output.to_json()}")
|
||||
if enable_elastic:
|
||||
elastic_telemetry = ElasticChaosRunTelemetry(
|
||||
chaos_run_telemetry=decoded_chaos_run_telemetry
|
||||
)
|
||||
result = elastic_search.push_telemetry(
|
||||
decoded_chaos_run_telemetry, elastic_telemetry_index
|
||||
)
|
||||
|
||||
401
tests/test_prometheus_collector.py
Normal file
401
tests/test_prometheus_collector.py
Normal file
@@ -0,0 +1,401 @@
|
||||
"""
|
||||
Tests for krkn.prometheus.collector module.
|
||||
|
||||
How to run these tests:
|
||||
|
||||
# Run all tests in this file
|
||||
python -m unittest tests.test_prometheus_collector
|
||||
|
||||
# Run all tests with verbose output
|
||||
python -m unittest tests.test_prometheus_collector -v
|
||||
|
||||
# Run a specific test class
|
||||
python -m unittest tests.test_prometheus_collector.TestSLOPassed
|
||||
python -m unittest tests.test_prometheus_collector.TestEvaluateSLOs
|
||||
|
||||
# Run a specific test method
|
||||
python -m unittest tests.test_prometheus_collector.TestSLOPassed.test_empty_result_returns_none
|
||||
python -m unittest tests.test_prometheus_collector.TestEvaluateSLOs.test_evaluate_single_slo_passing
|
||||
|
||||
# Run with coverage
|
||||
python -m coverage run -m unittest tests.test_prometheus_collector
|
||||
python -m coverage report -m
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import unittest
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
|
||||
from krkn.prometheus.collector import slo_passed, evaluate_slos
|
||||
|
||||
|
||||
class TestSLOPassed(unittest.TestCase):
|
||||
"""Test cases for the slo_passed function."""
|
||||
|
||||
def test_empty_result_returns_none(self):
|
||||
"""Test that an empty result list returns None."""
|
||||
result = slo_passed([])
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_result_with_values_all_zero_returns_true(self):
|
||||
"""Test that all zero values in 'values' returns True."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0"],
|
||||
[1234567891, "0"],
|
||||
[1234567892, "0"],
|
||||
]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_result_with_values_containing_nonzero_returns_false(self):
|
||||
"""Test that any non-zero value in 'values' returns False."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0"],
|
||||
[1234567891, "1.5"], # Non-zero value
|
||||
[1234567892, "0"],
|
||||
]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_result_with_single_value_zero_returns_true(self):
|
||||
"""Test that a single 'value' field with zero returns True."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"value": [1234567890, "0"]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_result_with_single_value_nonzero_returns_false(self):
|
||||
"""Test that a single 'value' field with non-zero returns False."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"value": [1234567890, "5.2"]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_result_with_no_samples_returns_none(self):
|
||||
"""Test that result with no 'values' or 'value' keys returns None."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"metric": {"job": "test"}
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_result_with_invalid_value_type_in_values(self):
|
||||
"""Test handling of invalid value types in 'values' field."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "invalid"], # Will raise ValueError
|
||||
[1234567891, "0"],
|
||||
]
|
||||
}
|
||||
]
|
||||
# Should continue processing after ValueError and find the zero
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_result_with_invalid_value_in_single_value_returns_false(self):
|
||||
"""Test that invalid value type in 'value' field returns False."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"value": [1234567890, "invalid"]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_result_with_none_value_in_values(self):
|
||||
"""Test handling of None values in 'values' field."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, None], # Will raise TypeError
|
||||
[1234567891, "0"],
|
||||
]
|
||||
}
|
||||
]
|
||||
# Should continue processing after TypeError and find the zero
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_result_with_multiple_series_first_has_nonzero(self):
|
||||
"""Test that first non-zero value in any series returns False immediately."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0"],
|
||||
[1234567891, "2.0"], # Non-zero in first series
|
||||
]
|
||||
},
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0"],
|
||||
[1234567891, "0"],
|
||||
]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_result_with_float_zero(self):
|
||||
"""Test that float zero is handled correctly."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0.0"],
|
||||
[1234567891, "0.00"],
|
||||
]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_result_with_scientific_notation(self):
|
||||
"""Test values in scientific notation."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0e0"],
|
||||
[1234567891, "1e-10"], # Very small but non-zero
|
||||
]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertFalse(result)
|
||||
|
||||
|
||||
class TestEvaluateSLOs(unittest.TestCase):
|
||||
"""Test cases for the evaluate_slos function."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
self.mock_prom_cli = Mock()
|
||||
self.start_time = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
self.end_time = datetime.datetime(2025, 1, 1, 1, 0, 0)
|
||||
|
||||
def test_evaluate_single_slo_passing(self):
|
||||
"""Test evaluation of a single passing SLO."""
|
||||
slo_list = [
|
||||
{
|
||||
"name": "test_slo",
|
||||
"expr": "sum(rate(http_requests_total[5m]))"
|
||||
}
|
||||
]
|
||||
|
||||
# Mock the Prometheus response with all zeros (passing)
|
||||
self.mock_prom_cli.process_prom_query_in_range.return_value = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0"],
|
||||
[1234567891, "0"],
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
results = evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
self.assertEqual(results["test_slo"], True)
|
||||
self.mock_prom_cli.process_prom_query_in_range.assert_called_once_with(
|
||||
"sum(rate(http_requests_total[5m]))",
|
||||
start_time=self.start_time,
|
||||
end_time=self.end_time,
|
||||
)
|
||||
|
||||
def test_evaluate_single_slo_failing(self):
|
||||
"""Test evaluation of a single failing SLO."""
|
||||
slo_list = [
|
||||
{
|
||||
"name": "test_slo",
|
||||
"expr": "sum(rate(errors[5m]))"
|
||||
}
|
||||
]
|
||||
|
||||
# Mock the Prometheus response with non-zero value (failing)
|
||||
self.mock_prom_cli.process_prom_query_in_range.return_value = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0"],
|
||||
[1234567891, "5"], # Non-zero indicates failure
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
results = evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
self.assertEqual(results["test_slo"], False)
|
||||
|
||||
def test_evaluate_slo_with_no_data_returns_true(self):
|
||||
"""Test that SLO with no data (None) is treated as passing."""
|
||||
slo_list = [
|
||||
{
|
||||
"name": "test_slo",
|
||||
"expr": "absent(metric)"
|
||||
}
|
||||
]
|
||||
|
||||
# Mock the Prometheus response with no samples
|
||||
self.mock_prom_cli.process_prom_query_in_range.return_value = []
|
||||
|
||||
results = evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
# No data should be treated as passing
|
||||
self.assertEqual(results["test_slo"], True)
|
||||
|
||||
def test_evaluate_slo_query_exception_returns_false(self):
|
||||
"""Test that an exception during query results in False."""
|
||||
slo_list = [
|
||||
{
|
||||
"name": "test_slo",
|
||||
"expr": "invalid_query"
|
||||
}
|
||||
]
|
||||
|
||||
# Mock the Prometheus client to raise an exception
|
||||
self.mock_prom_cli.process_prom_query_in_range.side_effect = Exception("Query failed")
|
||||
|
||||
with patch('krkn.prometheus.collector.logging') as mock_logging:
|
||||
results = evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
# Exception should result in False
|
||||
self.assertEqual(results["test_slo"], False)
|
||||
mock_logging.error.assert_called_once()
|
||||
|
||||
def test_evaluate_multiple_slos(self):
|
||||
"""Test evaluation of multiple SLOs with mixed results."""
|
||||
slo_list = [
|
||||
{
|
||||
"name": "slo_pass",
|
||||
"expr": "query1"
|
||||
},
|
||||
{
|
||||
"name": "slo_fail",
|
||||
"expr": "query2"
|
||||
},
|
||||
{
|
||||
"name": "slo_no_data",
|
||||
"expr": "query3"
|
||||
}
|
||||
]
|
||||
|
||||
# Mock different responses for each query
|
||||
def mock_query_side_effect(expr, start_time, end_time):
|
||||
if expr == "query1":
|
||||
return [{"values": [[1234567890, "0"]]}]
|
||||
elif expr == "query2":
|
||||
return [{"values": [[1234567890, "1"]]}]
|
||||
else: # query3
|
||||
return []
|
||||
|
||||
self.mock_prom_cli.process_prom_query_in_range.side_effect = mock_query_side_effect
|
||||
|
||||
results = evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
self.assertEqual(results["slo_pass"], True)
|
||||
self.assertEqual(results["slo_fail"], False)
|
||||
self.assertEqual(results["slo_no_data"], True)
|
||||
self.assertEqual(len(results), 3)
|
||||
|
||||
def test_evaluate_empty_slo_list(self):
|
||||
"""Test evaluation with an empty SLO list."""
|
||||
slo_list = []
|
||||
|
||||
results = evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
self.assertEqual(results, {})
|
||||
self.mock_prom_cli.process_prom_query_in_range.assert_not_called()
|
||||
|
||||
@patch('krkn.prometheus.collector.logging')
|
||||
def test_evaluate_slos_logs_info_message(self, mock_logging):
|
||||
"""Test that evaluation logs an info message with SLO count."""
|
||||
slo_list = [
|
||||
{"name": "slo1", "expr": "query1"},
|
||||
{"name": "slo2", "expr": "query2"},
|
||||
]
|
||||
|
||||
self.mock_prom_cli.process_prom_query_in_range.return_value = [
|
||||
{"values": [[1234567890, "0"]]}
|
||||
]
|
||||
|
||||
evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
# Check that info logging was called with the expected message
|
||||
mock_logging.info.assert_called_once()
|
||||
call_args = mock_logging.info.call_args[0]
|
||||
self.assertIn("Evaluating %d SLOs", call_args[0])
|
||||
self.assertEqual(call_args[1], 2)
|
||||
|
||||
@patch('krkn.prometheus.collector.logging')
|
||||
def test_evaluate_slos_logs_debug_for_no_data(self, mock_logging):
|
||||
"""Test that no data scenario logs a debug message."""
|
||||
slo_list = [
|
||||
{"name": "test_slo", "expr": "query"}
|
||||
]
|
||||
|
||||
self.mock_prom_cli.process_prom_query_in_range.return_value = []
|
||||
|
||||
evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
# Check that debug logging was called
|
||||
mock_logging.debug.assert_called_once()
|
||||
call_args = mock_logging.debug.call_args[0]
|
||||
self.assertIn("no data", call_args[0])
|
||||
self.assertIn("test_slo", call_args[1])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
624
tests/test_resiliency.py
Normal file
624
tests/test_resiliency.py
Normal file
@@ -0,0 +1,624 @@
|
||||
"""
|
||||
Tests for krkn.resiliency.resiliency module.
|
||||
|
||||
How to run these tests:
|
||||
|
||||
# Run all tests in this file
|
||||
python -m unittest tests.test_resiliency
|
||||
|
||||
# Run all tests with verbose output
|
||||
python -m unittest tests.test_resiliency -v
|
||||
|
||||
# Run a specific test class
|
||||
python -m unittest tests.test_resiliency.TestResiliencyInit
|
||||
python -m unittest tests.test_resiliency.TestResiliencyCalculateScore
|
||||
python -m unittest tests.test_resiliency.TestResiliencyScenarioReports
|
||||
|
||||
# Run a specific test method
|
||||
python -m unittest tests.test_resiliency.TestResiliencyInit.test_init_from_file
|
||||
python -m unittest tests.test_resiliency.TestResiliencyScenarioReports.test_add_scenario_report
|
||||
|
||||
# Run with coverage
|
||||
python -m coverage run -m unittest tests.test_resiliency
|
||||
python -m coverage report -m
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from krkn.resiliency.resiliency import Resiliency
|
||||
|
||||
|
||||
class TestResiliencyInit(unittest.TestCase):
|
||||
"""Test cases for Resiliency class initialization."""
|
||||
|
||||
def test_init_from_file(self):
|
||||
"""Test initialization from alerts.yaml file."""
|
||||
alerts_data = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "Instance down"},
|
||||
{"expr": "cpu > 80", "severity": "warning", "description": "High CPU"},
|
||||
]
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump(alerts_data, f)
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
res = Resiliency(alerts_yaml_path=temp_file)
|
||||
self.assertEqual(len(res._slos), 2)
|
||||
self.assertEqual(res._slos[0]["name"], "Instance down")
|
||||
self.assertEqual(res._slos[0]["expr"], "up == 0")
|
||||
self.assertEqual(res._slos[0]["severity"], "critical")
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
def test_init_from_file_not_found_raises_error(self):
|
||||
"""Test that missing alerts file raises FileNotFoundError."""
|
||||
with self.assertRaises(FileNotFoundError):
|
||||
Resiliency(alerts_yaml_path="/nonexistent/path.yaml")
|
||||
|
||||
def test_init_preserves_custom_weight_on_slo(self):
|
||||
"""Test that custom weight is preserved from the alerts file."""
|
||||
alerts_data = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "slo1", "weight": 10},
|
||||
]
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump(alerts_data, f)
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
res = Resiliency(alerts_yaml_path=temp_file)
|
||||
self.assertEqual(res._slos[0]["weight"], 10)
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
def test_normalise_alerts_with_valid_data(self):
|
||||
"""Test _normalise_alerts with valid alert data."""
|
||||
raw_alerts = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "Down"},
|
||||
{"expr": "cpu > 80", "severity": "warning", "description": "High CPU"},
|
||||
]
|
||||
|
||||
normalized = Resiliency._normalise_alerts(raw_alerts)
|
||||
|
||||
self.assertEqual(len(normalized), 2)
|
||||
self.assertEqual(normalized[0]["name"], "Down")
|
||||
self.assertEqual(normalized[1]["name"], "High CPU")
|
||||
|
||||
def test_normalise_alerts_without_description_uses_index(self):
|
||||
"""Test _normalise_alerts uses index as name when description missing."""
|
||||
raw_alerts = [
|
||||
{"expr": "up == 0", "severity": "critical"},
|
||||
]
|
||||
|
||||
normalized = Resiliency._normalise_alerts(raw_alerts)
|
||||
|
||||
self.assertEqual(normalized[0]["name"], "slo_0")
|
||||
|
||||
def test_normalise_alerts_skips_invalid_entries(self):
|
||||
"""Test _normalise_alerts skips entries missing required fields."""
|
||||
raw_alerts = [
|
||||
{"expr": "up == 0", "severity": "critical"}, # Valid
|
||||
{"severity": "warning"}, # Missing expr
|
||||
{"expr": "cpu > 80"}, # Missing severity
|
||||
"invalid", # Not a dict
|
||||
]
|
||||
|
||||
with patch('krkn.resiliency.resiliency.logging') as mock_logging:
|
||||
normalized = Resiliency._normalise_alerts(raw_alerts)
|
||||
|
||||
self.assertEqual(len(normalized), 1)
|
||||
self.assertEqual(mock_logging.warning.call_count, 3)
|
||||
|
||||
def test_normalise_alerts_with_non_list_raises_error(self):
|
||||
"""Test _normalise_alerts raises ValueError for non-list input."""
|
||||
with self.assertRaises(ValueError):
|
||||
Resiliency._normalise_alerts("not a list")
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
Resiliency._normalise_alerts({"key": "value"})
|
||||
|
||||
def test_normalise_alerts_stores_weight_none_when_absent(self):
|
||||
"""Test that alerts without a weight field store None, not 0, preserving severity fallback."""
|
||||
raw_alerts = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "no weight"},
|
||||
]
|
||||
|
||||
normalized = Resiliency._normalise_alerts(raw_alerts)
|
||||
|
||||
self.assertIsNone(normalized[0]["weight"])
|
||||
|
||||
def test_normalise_alerts_stores_custom_weight_when_present(self):
|
||||
"""Test that a numeric weight field is preserved exactly."""
|
||||
raw_alerts = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "slo1", "weight": 10},
|
||||
{"expr": "cpu > 80", "severity": "warning", "description": "slo2", "weight": 0.5},
|
||||
]
|
||||
|
||||
normalized = Resiliency._normalise_alerts(raw_alerts)
|
||||
|
||||
self.assertEqual(normalized[0]["weight"], 10)
|
||||
self.assertEqual(normalized[1]["weight"], 0.5)
|
||||
|
||||
|
||||
class TestResiliencyCalculateScore(unittest.TestCase):
|
||||
"""Test cases for calculate_score method."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
alerts_data = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "slo1"},
|
||||
{"expr": "cpu > 80", "severity": "warning", "description": "slo2"},
|
||||
]
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump(alerts_data, f)
|
||||
self.temp_file = f.name
|
||||
|
||||
self.res = Resiliency(alerts_yaml_path=self.temp_file)
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up temp files."""
|
||||
if os.path.exists(self.temp_file):
|
||||
os.unlink(self.temp_file)
|
||||
|
||||
def test_calculate_score_with_all_passing(self):
|
||||
"""Test calculate_score with all SLOs passing."""
|
||||
self.res._results = {"slo1": True, "slo2": True}
|
||||
score = self.res.calculate_score()
|
||||
|
||||
self.assertEqual(score, 100)
|
||||
self.assertEqual(self.res._score, 100)
|
||||
|
||||
def test_calculate_score_with_failures(self):
|
||||
"""Test calculate_score with some failures."""
|
||||
self.res._results = {"slo1": False, "slo2": True}
|
||||
score = self.res.calculate_score()
|
||||
|
||||
# slo1 is critical (3 pts lost), slo2 is warning (1 pt)
|
||||
# Total: 4 pts, Lost: 3 pts -> 25%
|
||||
self.assertEqual(score, 25)
|
||||
|
||||
def test_calculate_score_with_health_checks(self):
|
||||
"""Test calculate_score includes health check results."""
|
||||
self.res._results = {"slo1": True, "slo2": True}
|
||||
health_checks = {"http://service": False} # Critical, 3 pts lost
|
||||
|
||||
score = self.res.calculate_score(health_check_results=health_checks)
|
||||
|
||||
# Total: 3 + 1 + 3 = 7 pts, Lost: 3 pts -> ~57%
|
||||
self.assertEqual(score, 57)
|
||||
self.assertEqual(self.res._health_check_results, health_checks)
|
||||
|
||||
def test_calculate_score_uses_per_slo_custom_weight_from_yaml(self):
|
||||
"""Integration: per-SLO custom weight loaded from YAML is used in scoring."""
|
||||
alerts_data = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "high", "weight": 10},
|
||||
{"expr": "cpu > 80", "severity": "warning", "description": "low", "weight": 0.5},
|
||||
]
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump(alerts_data, f)
|
||||
temp = f.name
|
||||
|
||||
try:
|
||||
res = Resiliency(alerts_yaml_path=temp)
|
||||
# "high" passes (10 pts), "low" fails (loses 0.5 pts)
|
||||
res._results = {"high": True, "low": False}
|
||||
score = res.calculate_score()
|
||||
|
||||
# Total: 10.5, Lost: 0.5 -> 95%
|
||||
self.assertEqual(score, 95)
|
||||
self.assertEqual(res._breakdown["total_points"], 10.5)
|
||||
self.assertEqual(res._breakdown["points_lost"], 0.5)
|
||||
finally:
|
||||
os.unlink(temp)
|
||||
|
||||
def test_calculate_score_stores_breakdown(self):
|
||||
"""Test that calculate_score stores the breakdown dict."""
|
||||
self.res._results = {"slo1": True, "slo2": False}
|
||||
self.res.calculate_score()
|
||||
|
||||
self.assertIsNotNone(self.res._breakdown)
|
||||
self.assertIn("passed", self.res._breakdown)
|
||||
self.assertIn("failed", self.res._breakdown)
|
||||
self.assertIn("total_points", self.res._breakdown)
|
||||
self.assertIn("points_lost", self.res._breakdown)
|
||||
|
||||
|
||||
class TestResiliencyToDict(unittest.TestCase):
|
||||
"""Test cases for to_dict method."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump([{"expr": "test", "severity": "critical"}], f)
|
||||
self.temp_file = f.name
|
||||
|
||||
self.res = Resiliency(alerts_yaml_path=self.temp_file)
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up temp files."""
|
||||
if os.path.exists(self.temp_file):
|
||||
os.unlink(self.temp_file)
|
||||
|
||||
def test_to_dict_before_calculate_raises_error(self):
|
||||
"""Test that to_dict raises error if calculate_score not called."""
|
||||
with self.assertRaises(RuntimeError):
|
||||
self.res.to_dict()
|
||||
|
||||
def test_to_dict_returns_complete_data(self):
|
||||
"""Test that to_dict returns all expected fields."""
|
||||
self.res._results = {"slo_0": True}
|
||||
health_checks = {"health1": True}
|
||||
self.res.calculate_score(health_check_results=health_checks)
|
||||
|
||||
result = self.res.to_dict()
|
||||
|
||||
self.assertIn("score", result)
|
||||
self.assertIn("breakdown", result)
|
||||
self.assertIn("slo_results", result)
|
||||
self.assertIn("health_check_results", result)
|
||||
self.assertEqual(result["slo_results"], {"slo_0": True})
|
||||
self.assertEqual(result["health_check_results"], health_checks)
|
||||
|
||||
|
||||
class TestResiliencyScenarioReports(unittest.TestCase):
|
||||
"""Test cases for scenario-based resiliency evaluation."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump([
|
||||
{"expr": "up == 0", "severity": "critical", "description": "slo1"}
|
||||
], f)
|
||||
self.temp_file = f.name
|
||||
|
||||
self.res = Resiliency(alerts_yaml_path=self.temp_file)
|
||||
self.mock_prom = Mock()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up temp files."""
|
||||
if os.path.exists(self.temp_file):
|
||||
os.unlink(self.temp_file)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
@patch('krkn.resiliency.resiliency.calculate_resiliency_score')
|
||||
def test_add_scenario_report(self, mock_calc_score, mock_eval_slos):
|
||||
"""Test adding a scenario report."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
mock_calc_score.return_value = (100, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
|
||||
|
||||
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
end = datetime.datetime(2025, 1, 1, 1, 0, 0)
|
||||
|
||||
score = self.res.add_scenario_report(
|
||||
scenario_name="test_scenario",
|
||||
prom_cli=self.mock_prom,
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
weight=1.5,
|
||||
)
|
||||
|
||||
self.assertEqual(score, 100)
|
||||
self.assertEqual(len(self.res.scenario_reports), 1)
|
||||
self.assertEqual(self.res.scenario_reports[0]["name"], "test_scenario")
|
||||
self.assertEqual(self.res.scenario_reports[0]["weight"], 1.5)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
def test_finalize_report_calculates_weighted_average(self, mock_eval_slos):
|
||||
"""Test that finalize_report calculates weighted average correctly."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
|
||||
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
end = datetime.datetime(2025, 1, 1, 2, 0, 0)
|
||||
|
||||
# Add two scenarios with different scores and weights
|
||||
with patch('krkn.resiliency.resiliency.calculate_resiliency_score') as mock_calc:
|
||||
mock_calc.return_value = (80, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
|
||||
self.res.add_scenario_report(
|
||||
scenario_name="scenario1",
|
||||
prom_cli=self.mock_prom,
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
weight=2,
|
||||
)
|
||||
|
||||
mock_calc.return_value = (60, {"passed": 0, "failed": 1, "total_points": 3, "points_lost": 3})
|
||||
self.res.add_scenario_report(
|
||||
scenario_name="scenario2",
|
||||
prom_cli=self.mock_prom,
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
weight=1,
|
||||
)
|
||||
|
||||
with patch('krkn.resiliency.resiliency.calculate_resiliency_score') as mock_calc:
|
||||
mock_calc.return_value = (100, {"passed": 1, "failed": 0})
|
||||
self.res.finalize_report(
|
||||
prom_cli=self.mock_prom,
|
||||
total_start_time=start,
|
||||
total_end_time=end,
|
||||
)
|
||||
|
||||
# Weighted average: (80*2 + 60*1) / (2+1) = 220/3 = 73.33... = 73
|
||||
self.assertEqual(self.res.summary["resiliency_score"], 73)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
def test_finalize_report_populates_summary_and_detailed(self, mock_eval_slos):
|
||||
"""Test that finalize_report sets summary and detailed_report."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
|
||||
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
end = datetime.datetime(2025, 1, 1, 1, 0, 0)
|
||||
|
||||
with patch('krkn.resiliency.resiliency.calculate_resiliency_score') as mock_calc:
|
||||
mock_calc.return_value = (95, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
|
||||
self.res.add_scenario_report(
|
||||
scenario_name="s1",
|
||||
prom_cli=self.mock_prom,
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
)
|
||||
self.res.finalize_report(
|
||||
prom_cli=self.mock_prom,
|
||||
total_start_time=start,
|
||||
total_end_time=end,
|
||||
)
|
||||
|
||||
self.assertIsNotNone(self.res.summary)
|
||||
self.assertIn("resiliency_score", self.res.summary)
|
||||
self.assertIn("scenarios", self.res.summary)
|
||||
self.assertIsNotNone(self.res.detailed_report)
|
||||
self.assertIn("scenarios", self.res.detailed_report)
|
||||
|
||||
def test_finalize_report_without_scenarios_raises_error(self):
|
||||
"""Test that finalize_report raises error if no scenarios added."""
|
||||
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
end = datetime.datetime(2025, 1, 1, 1, 0, 0)
|
||||
|
||||
with self.assertRaises(RuntimeError):
|
||||
self.res.finalize_report(
|
||||
prom_cli=self.mock_prom,
|
||||
total_start_time=start,
|
||||
total_end_time=end,
|
||||
)
|
||||
|
||||
def test_get_summary_before_finalize_raises_error(self):
|
||||
"""Test that get_summary raises RuntimeError before finalize_report is called."""
|
||||
with self.assertRaises(RuntimeError):
|
||||
self.res.get_summary()
|
||||
|
||||
def test_get_detailed_report_before_finalize_raises_error(self):
|
||||
"""Test that get_detailed_report raises RuntimeError before finalize_report is called."""
|
||||
with self.assertRaises(RuntimeError):
|
||||
self.res.get_detailed_report()
|
||||
|
||||
|
||||
class TestResiliencyCompactBreakdown(unittest.TestCase):
|
||||
"""Test cases for compact_breakdown static method."""
|
||||
|
||||
def test_compact_breakdown_with_valid_report(self):
|
||||
"""Test compact_breakdown with valid report structure."""
|
||||
report = {
|
||||
"score": 85,
|
||||
"breakdown": {
|
||||
"passed": 8,
|
||||
"failed": 2,
|
||||
}
|
||||
}
|
||||
|
||||
result = Resiliency.compact_breakdown(report)
|
||||
|
||||
self.assertEqual(result["resiliency_score"], 85)
|
||||
self.assertEqual(result["passed_slos"], 8)
|
||||
self.assertEqual(result["total_slos"], 10)
|
||||
|
||||
def test_compact_breakdown_with_missing_fields_uses_defaults(self):
|
||||
"""Test compact_breakdown handles missing fields gracefully."""
|
||||
report = {}
|
||||
|
||||
result = Resiliency.compact_breakdown(report)
|
||||
|
||||
self.assertEqual(result["resiliency_score"], 0)
|
||||
self.assertEqual(result["passed_slos"], 0)
|
||||
self.assertEqual(result["total_slos"], 0)
|
||||
|
||||
|
||||
class TestResiliencyAddScenarioReports(unittest.TestCase):
|
||||
"""Test cases for the add_scenario_reports method."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump([
|
||||
{"expr": "up == 0", "severity": "critical", "description": "slo1"}
|
||||
], f)
|
||||
self.temp_file = f.name
|
||||
|
||||
self.res = Resiliency(alerts_yaml_path=self.temp_file)
|
||||
self.mock_prom = Mock()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up temp files."""
|
||||
if os.path.exists(self.temp_file):
|
||||
os.unlink(self.temp_file)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
@patch('krkn.resiliency.resiliency.calculate_resiliency_score')
|
||||
def test_add_scenario_reports_enriches_dict_telemetry(self, mock_calc_score, mock_eval_slos):
|
||||
"""Test that dict telemetry items are enriched with a resiliency_report."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
mock_calc_score.return_value = (85, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
|
||||
|
||||
telemetries = [
|
||||
{
|
||||
"scenario": "pod_scenario",
|
||||
"start_timestamp": 1609459200,
|
||||
"end_timestamp": 1609462800,
|
||||
}
|
||||
]
|
||||
|
||||
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
end = datetime.datetime(2025, 1, 1, 1, 0, 0)
|
||||
|
||||
self.res.add_scenario_reports(
|
||||
scenario_telemetries=telemetries,
|
||||
prom_cli=self.mock_prom,
|
||||
scenario_type="default_type",
|
||||
batch_start_dt=start,
|
||||
batch_end_dt=end,
|
||||
weight=1.5,
|
||||
)
|
||||
|
||||
self.assertEqual(len(self.res.scenario_reports), 1)
|
||||
self.assertIn("resiliency_report", telemetries[0])
|
||||
self.assertIn("resiliency_score", telemetries[0]["resiliency_report"])
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
@patch('krkn.resiliency.resiliency.calculate_resiliency_score')
|
||||
def test_add_scenario_reports_uses_batch_times_when_timestamps_missing(self, mock_calc_score, mock_eval_slos):
|
||||
"""Test that batch times are used when telemetry has no timestamps."""
|
||||
mock_eval_slos.return_value = {}
|
||||
mock_calc_score.return_value = (0, {"passed": 0, "failed": 0, "total_points": 0, "points_lost": 0})
|
||||
|
||||
telemetries = [{"scenario": "my_scenario"}]
|
||||
start = datetime.datetime(2025, 6, 1, 0, 0, 0)
|
||||
end = datetime.datetime(2025, 6, 1, 1, 0, 0)
|
||||
|
||||
self.res.add_scenario_reports(
|
||||
scenario_telemetries=telemetries,
|
||||
prom_cli=self.mock_prom,
|
||||
scenario_type="fallback_type",
|
||||
batch_start_dt=start,
|
||||
batch_end_dt=end,
|
||||
)
|
||||
|
||||
# evaluate_slos should have been called with the batch times
|
||||
call_kwargs = mock_eval_slos.call_args[1]
|
||||
self.assertEqual(call_kwargs["start_time"], start)
|
||||
self.assertEqual(call_kwargs["end_time"], end)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
@patch('krkn.resiliency.resiliency.calculate_resiliency_score')
|
||||
def test_add_scenario_reports_uses_scenario_name_from_telemetry(self, mock_calc_score, mock_eval_slos):
|
||||
"""Test that scenario name is taken from telemetry, not the fallback type."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
mock_calc_score.return_value = (100, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
|
||||
|
||||
telemetries = [{"scenario": "real_scenario_name"}]
|
||||
|
||||
self.res.add_scenario_reports(
|
||||
scenario_telemetries=telemetries,
|
||||
prom_cli=self.mock_prom,
|
||||
scenario_type="fallback_type",
|
||||
batch_start_dt=datetime.datetime(2025, 1, 1),
|
||||
batch_end_dt=datetime.datetime(2025, 1, 2),
|
||||
)
|
||||
|
||||
self.assertEqual(self.res.scenario_reports[0]["name"], "real_scenario_name")
|
||||
|
||||
|
||||
class TestFinalizeAndSave(unittest.TestCase):
|
||||
"""Test cases for finalize_and_save method."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures with a pre-populated scenario report."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump([
|
||||
{"expr": "up == 0", "severity": "critical", "description": "slo1"}
|
||||
], f)
|
||||
self.temp_file = f.name
|
||||
|
||||
self.res = Resiliency(alerts_yaml_path=self.temp_file)
|
||||
self.mock_prom = Mock()
|
||||
self.start = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
self.end = datetime.datetime(2025, 1, 1, 2, 0, 0)
|
||||
|
||||
# Pre-populate a scenario report so finalize_report doesn't raise
|
||||
self.res.scenario_reports = [
|
||||
{
|
||||
"name": "test_scenario",
|
||||
"window": {"start": self.start.isoformat(), "end": self.end.isoformat()},
|
||||
"score": 90,
|
||||
"weight": 1,
|
||||
"breakdown": {"total_points": 3, "points_lost": 0, "passed": 1, "failed": 0},
|
||||
"slo_results": {"slo1": True},
|
||||
"health_check_results": {},
|
||||
}
|
||||
]
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up temp files."""
|
||||
if os.path.exists(self.temp_file):
|
||||
os.unlink(self.temp_file)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
def test_finalize_and_save_standalone_writes_detailed_file(self, mock_eval_slos):
|
||||
"""Test that standalone mode writes a detailed JSON report to the given path."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
detailed_path = os.path.join(tmpdir, "resiliency-report.json")
|
||||
|
||||
self.res.finalize_and_save(
|
||||
prom_cli=self.mock_prom,
|
||||
total_start_time=self.start,
|
||||
total_end_time=self.end,
|
||||
run_mode="standalone",
|
||||
detailed_path=detailed_path,
|
||||
)
|
||||
|
||||
self.assertTrue(os.path.exists(detailed_path))
|
||||
with open(detailed_path) as fp:
|
||||
report = json.load(fp)
|
||||
self.assertIn("scenarios", report)
|
||||
|
||||
@patch('builtins.print')
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
def test_finalize_and_save_controller_mode_prints_to_stdout(self, mock_eval_slos, mock_print):
|
||||
"""Test that controller mode prints the detailed report to stdout with the expected prefix."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
|
||||
self.res.finalize_and_save(
|
||||
prom_cli=self.mock_prom,
|
||||
total_start_time=self.start,
|
||||
total_end_time=self.end,
|
||||
run_mode="controller",
|
||||
)
|
||||
|
||||
mock_print.assert_called()
|
||||
call_args = str(mock_print.call_args)
|
||||
self.assertIn("KRKN_RESILIENCY_REPORT_JSON", call_args)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
def test_finalize_and_save_populates_summary_after_call(self, mock_eval_slos):
|
||||
"""Test that finalize_and_save populates summary so get_summary works afterward."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
|
||||
self.res.finalize_and_save(
|
||||
prom_cli=self.mock_prom,
|
||||
total_start_time=self.start,
|
||||
total_end_time=self.end,
|
||||
)
|
||||
|
||||
summary = self.res.get_summary()
|
||||
self.assertIsNotNone(summary)
|
||||
self.assertIn("resiliency_score", summary)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
409
tests/test_resiliency_score.py
Normal file
409
tests/test_resiliency_score.py
Normal file
@@ -0,0 +1,409 @@
|
||||
"""
|
||||
Tests for krkn.resiliency.score module.
|
||||
|
||||
How to run these tests:
|
||||
|
||||
# Run all tests in this file
|
||||
python -m unittest tests.test_resiliency_score
|
||||
|
||||
# Run all tests with verbose output
|
||||
python -m unittest tests.test_resiliency_score -v
|
||||
|
||||
# Run a specific test class
|
||||
python -m unittest tests.test_resiliency_score.TestSLOResult
|
||||
python -m unittest tests.test_resiliency_score.TestCalculateResiliencyScore
|
||||
|
||||
# Run a specific test method
|
||||
python -m unittest tests.test_resiliency_score.TestSLOResult.test_slo_result_initialization
|
||||
python -m unittest tests.test_resiliency_score.TestCalculateResiliencyScore.test_all_slos_passing_returns_100
|
||||
|
||||
# Run with coverage
|
||||
python -m coverage run -m unittest tests.test_resiliency_score
|
||||
python -m coverage report -m
|
||||
"""
|
||||
|
||||
import unittest
|
||||
|
||||
from krkn.resiliency.score import (
|
||||
SLOResult,
|
||||
calculate_resiliency_score,
|
||||
DEFAULT_WEIGHTS,
|
||||
)
|
||||
|
||||
|
||||
class TestSLOResult(unittest.TestCase):
|
||||
"""Test cases for the SLOResult class."""
|
||||
|
||||
def test_slo_result_initialization(self):
|
||||
"""Test SLOResult object initialization."""
|
||||
slo = SLOResult(name="test_slo", severity="critical", passed=True)
|
||||
self.assertEqual(slo.name, "test_slo")
|
||||
self.assertEqual(slo.severity, "critical")
|
||||
self.assertTrue(slo.passed)
|
||||
|
||||
def test_slo_result_weight_critical_default(self):
|
||||
"""Test weight calculation for critical SLO with default weights."""
|
||||
slo = SLOResult(name="test_slo", severity="critical", passed=True)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), DEFAULT_WEIGHTS["critical"])
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 3)
|
||||
|
||||
def test_slo_result_weight_warning_default(self):
|
||||
"""Test weight calculation for warning SLO with default weights."""
|
||||
slo = SLOResult(name="test_slo", severity="warning", passed=True)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), DEFAULT_WEIGHTS["warning"])
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 1)
|
||||
|
||||
def test_slo_result_weight_custom_severity_weights(self):
|
||||
"""Test weight calculation with custom severity-level weights."""
|
||||
custom_weights = {"critical": 5, "warning": 2}
|
||||
slo_critical = SLOResult(name="test1", severity="critical", passed=True)
|
||||
slo_warning = SLOResult(name="test2", severity="warning", passed=True)
|
||||
|
||||
self.assertEqual(slo_critical.weight(custom_weights), 5)
|
||||
self.assertEqual(slo_warning.weight(custom_weights), 2)
|
||||
|
||||
def test_slo_result_weight_unknown_severity_falls_back_to_warning(self):
|
||||
"""Test that unknown severity falls back to warning weight."""
|
||||
slo = SLOResult(name="test_slo", severity="unknown", passed=True)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), DEFAULT_WEIGHTS["warning"])
|
||||
|
||||
def test_slo_result_custom_weight_overrides_severity(self):
|
||||
"""Test that a per-SLO custom weight overrides the severity-based weight."""
|
||||
slo = SLOResult(name="test_slo", severity="critical", passed=True, weight=10)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 10)
|
||||
|
||||
def test_slo_result_custom_weight_zero_is_valid(self):
|
||||
"""Test that a per-SLO weight of 0 is respected."""
|
||||
slo = SLOResult(name="test_slo", severity="critical", passed=False, weight=0)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 0)
|
||||
|
||||
def test_slo_result_explicit_none_weight_falls_back_to_severity(self):
|
||||
"""Test that weight=None explicitly falls back to severity-based weight, not 0."""
|
||||
slo = SLOResult(name="test_slo", severity="critical", passed=True, weight=None)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), DEFAULT_WEIGHTS["critical"])
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 3)
|
||||
|
||||
def test_slo_result_float_custom_weight(self):
|
||||
"""Test that a fractional custom weight (e.g. 0.5 as documented) is returned as-is."""
|
||||
slo = SLOResult(name="test_slo", severity="warning", passed=True, weight=0.5)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 0.5)
|
||||
|
||||
|
||||
class TestCalculateResiliencyScore(unittest.TestCase):
|
||||
"""Test cases for the calculate_resiliency_score function."""
|
||||
|
||||
def test_all_slos_passing_returns_100(self):
|
||||
"""Test that all passing SLOs returns score of 100."""
|
||||
slo_definitions = {
|
||||
"slo1": "critical",
|
||||
"slo2": "warning",
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": True,
|
||||
"slo2": True,
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
self.assertEqual(score, 100)
|
||||
self.assertEqual(breakdown["passed"], 2)
|
||||
self.assertEqual(breakdown["failed"], 0)
|
||||
self.assertEqual(breakdown["points_lost"], 0)
|
||||
|
||||
def test_all_slos_failing_returns_0(self):
|
||||
"""Test that all failing SLOs returns score of 0."""
|
||||
slo_definitions = {
|
||||
"slo1": "critical",
|
||||
"slo2": "warning",
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": False,
|
||||
"slo2": False,
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
self.assertEqual(score, 0)
|
||||
self.assertEqual(breakdown["passed"], 0)
|
||||
self.assertEqual(breakdown["failed"], 2)
|
||||
|
||||
def test_mixed_results_calculates_correct_score(self):
|
||||
"""Test score calculation with mixed pass/fail results."""
|
||||
slo_definitions = {
|
||||
"slo_critical": "critical", # weight=3
|
||||
"slo_warning": "warning", # weight=1
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo_critical": True, # 3 points
|
||||
"slo_warning": False, # 0 points (lost 1)
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# Total: 4 points, Lost: 1 point
|
||||
# Score: (4-1)/4 * 100 = 75%
|
||||
self.assertEqual(score, 75)
|
||||
self.assertEqual(breakdown["total_points"], 4)
|
||||
self.assertEqual(breakdown["points_lost"], 1)
|
||||
self.assertEqual(breakdown["passed"], 1)
|
||||
self.assertEqual(breakdown["failed"], 1)
|
||||
|
||||
def test_slo_not_in_prometheus_results_is_excluded(self):
|
||||
"""Test that SLOs not in prometheus_results are excluded from calculation."""
|
||||
slo_definitions = {
|
||||
"slo1": "critical",
|
||||
"slo2": "warning",
|
||||
"slo3": "critical", # Not in prometheus_results
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": True,
|
||||
"slo2": True,
|
||||
# slo3 is missing (no data)
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# Only slo1 and slo2 should be counted
|
||||
self.assertEqual(score, 100)
|
||||
self.assertEqual(breakdown["passed"], 2)
|
||||
self.assertEqual(breakdown["failed"], 0)
|
||||
|
||||
def test_health_checks_are_treated_as_critical(self):
|
||||
"""Test that health checks are always weighted as critical."""
|
||||
slo_definitions = {}
|
||||
prometheus_results = {}
|
||||
health_check_results = {
|
||||
"http://service1": True,
|
||||
"http://service2": False,
|
||||
}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# 2 health checks, each critical (weight=3)
|
||||
# Total: 6 points, Lost: 3 points (one failed)
|
||||
# Score: (6-3)/6 * 100 = 50%
|
||||
self.assertEqual(score, 50)
|
||||
self.assertEqual(breakdown["total_points"], 6)
|
||||
self.assertEqual(breakdown["points_lost"], 3)
|
||||
|
||||
def test_combined_slos_and_health_checks(self):
|
||||
"""Test calculation with both SLOs and health checks."""
|
||||
slo_definitions = {
|
||||
"slo1": "warning", # weight=1
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": True,
|
||||
}
|
||||
health_check_results = {
|
||||
"health1": True, # weight=3 (critical)
|
||||
"health2": False, # weight=3 (critical)
|
||||
}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# Total: 1 + 3 + 3 = 7 points
|
||||
# Lost: 3 points (health2 failed)
|
||||
# Score: (7-3)/7 * 100 = 57.14... = 57%
|
||||
self.assertEqual(score, 57)
|
||||
self.assertEqual(breakdown["total_points"], 7)
|
||||
self.assertEqual(breakdown["points_lost"], 3)
|
||||
self.assertEqual(breakdown["passed"], 2)
|
||||
self.assertEqual(breakdown["failed"], 1)
|
||||
|
||||
def test_per_slo_custom_weight_overrides_severity(self):
|
||||
"""Test that per-SLO custom weight in extended format overrides default severity weight."""
|
||||
slo_definitions = {
|
||||
"slo1": {"severity": "critical", "weight": 10},
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": False,
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
self.assertEqual(breakdown["total_points"], 10)
|
||||
self.assertEqual(breakdown["points_lost"], 10)
|
||||
self.assertEqual(score, 0)
|
||||
|
||||
def test_extended_format_mixed_with_legacy_format(self):
|
||||
"""Test that extended dict format and legacy string format can be mixed."""
|
||||
slo_definitions = {
|
||||
"slo_custom": {"severity": "warning", "weight": 5}, # custom weight
|
||||
"slo_legacy": "critical", # legacy, weight=3
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo_custom": False, # loses 5 pts
|
||||
"slo_legacy": True, # keeps 3 pts
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# Total: 5 + 3 = 8, Lost: 5
|
||||
# Score: (8-5)/8 * 100 = 37.5 -> 37
|
||||
self.assertEqual(breakdown["total_points"], 8)
|
||||
self.assertEqual(breakdown["points_lost"], 5)
|
||||
self.assertEqual(score, 37)
|
||||
|
||||
def test_extended_format_weight_none_falls_back_to_severity(self):
|
||||
"""Test that weight=None in extended dict format falls back to severity-based weight."""
|
||||
slo_definitions = {
|
||||
"slo1": {"severity": "critical", "weight": None}, # should use default critical weight=3
|
||||
}
|
||||
prometheus_results = {"slo1": False}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# weight falls back to critical=3
|
||||
self.assertEqual(breakdown["total_points"], 3)
|
||||
self.assertEqual(breakdown["points_lost"], 3)
|
||||
self.assertEqual(score, 0)
|
||||
|
||||
def test_float_custom_weight_scoring(self):
|
||||
"""Test scoring with fractional weights as documented (e.g. weight: 0.5)."""
|
||||
slo_definitions = {
|
||||
"slo_high": {"severity": "critical", "weight": 10},
|
||||
"slo_low": {"severity": "warning", "weight": 0.5},
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo_high": True, # keeps 10 pts
|
||||
"slo_low": False, # loses 0.5 pts
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# Total: 10.5, Lost: 0.5 -> (10/10.5)*100 = 95.23... -> 95
|
||||
self.assertEqual(breakdown["total_points"], 10.5)
|
||||
self.assertEqual(breakdown["points_lost"], 0.5)
|
||||
self.assertEqual(score, 95)
|
||||
|
||||
def test_failed_slo_with_zero_weight_does_not_affect_score(self):
|
||||
"""Test that a failing SLO with weight=0 contributes nothing to points_lost."""
|
||||
slo_definitions = {
|
||||
"slo_zero": {"severity": "critical", "weight": 0},
|
||||
"slo_normal": "warning", # weight=1
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo_zero": False, # fails but contributes 0 pts
|
||||
"slo_normal": True,
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
self.assertEqual(breakdown["total_points"], 1)
|
||||
self.assertEqual(breakdown["points_lost"], 0)
|
||||
self.assertEqual(score, 100)
|
||||
|
||||
def test_all_custom_weight_slos_passing_returns_100(self):
|
||||
"""Test that all custom-weight SLOs passing returns 100 regardless of weight values."""
|
||||
slo_definitions = {
|
||||
"slo1": {"severity": "critical", "weight": 20},
|
||||
"slo2": {"severity": "warning", "weight": 5},
|
||||
"slo3": {"severity": "critical", "weight": 0.5},
|
||||
}
|
||||
prometheus_results = {"slo1": True, "slo2": True, "slo3": True}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
self.assertEqual(score, 100)
|
||||
self.assertEqual(breakdown["points_lost"], 0)
|
||||
self.assertEqual(breakdown["passed"], 3)
|
||||
self.assertEqual(breakdown["failed"], 0)
|
||||
|
||||
def test_empty_slo_definitions_returns_zero_score(self):
|
||||
"""Test that empty SLO definitions returns score of 0."""
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions={},
|
||||
prometheus_results={},
|
||||
health_check_results={}
|
||||
)
|
||||
|
||||
self.assertEqual(score, 0)
|
||||
self.assertEqual(breakdown["total_points"], 0)
|
||||
self.assertEqual(breakdown["points_lost"], 0)
|
||||
self.assertEqual(breakdown["passed"], 0)
|
||||
self.assertEqual(breakdown["failed"], 0)
|
||||
|
||||
def test_prometheus_results_coerced_to_bool(self):
|
||||
"""Test that prometheus results are properly coerced to boolean."""
|
||||
slo_definitions = {
|
||||
"slo1": "warning",
|
||||
"slo2": "warning",
|
||||
"slo3": "warning",
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": 1, # Truthy
|
||||
"slo2": 0, # Falsy
|
||||
"slo3": None, # Falsy
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# slo1 passes (1 point), slo2 and slo3 fail (0 points each)
|
||||
# Total: 3 points, Lost: 2 points
|
||||
# Score: (3-2)/3 * 100 = 33.33... = 33%
|
||||
self.assertEqual(score, 33)
|
||||
self.assertEqual(breakdown["passed"], 1)
|
||||
self.assertEqual(breakdown["failed"], 2)
|
||||
|
||||
def test_score_calculation_rounds_down(self):
|
||||
"""Test that score calculation rounds down to integer."""
|
||||
slo_definitions = {
|
||||
"slo1": "critical", # 3 points
|
||||
"slo2": "critical", # 3 points
|
||||
"slo3": "critical", # 3 points
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": True, # 3 points
|
||||
"slo2": True, # 3 points
|
||||
"slo3": False, # 0 points (lost 3)
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# Total: 9 points, Lost: 3 points
|
||||
# Score: (9-3)/9 * 100 = 66.666... -> 66
|
||||
self.assertEqual(score, 66)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user