mirror of
https://github.com/krkn-chaos/krkn.git
synced 2026-03-21 02:47:06 +00:00
Compare commits
10 Commits
v5.0.1-bet
...
v5.0.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3db5e1abbe | ||
|
|
1e699c6cc9 | ||
|
|
0ebda3e101 | ||
|
|
8a5be0dd2f | ||
|
|
62dadfe25c | ||
|
|
cb368a2f5c | ||
|
|
bb636cd3a9 | ||
|
|
f241b2b62f | ||
|
|
2a60a519cd | ||
|
|
31756e6d9b |
129
.github/workflows/docker-image.yml
vendored
129
.github/workflows/docker-image.yml
vendored
@@ -6,48 +6,117 @@ on:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ${{ matrix.runner }}
|
||||
strategy:
|
||||
matrix:
|
||||
include:
|
||||
- platform: amd64
|
||||
runner: ubuntu-latest
|
||||
- platform: arm64
|
||||
runner: ubuntu-24.04-arm
|
||||
steps:
|
||||
- name: Check out code
|
||||
uses: actions/checkout@v3
|
||||
- name: Build the Docker images
|
||||
if: startsWith(github.ref, 'refs/tags')
|
||||
run: |
|
||||
./containers/compile_dockerfile.sh
|
||||
docker build --no-cache -t quay.io/krkn-chaos/krkn containers/ --build-arg TAG=${GITHUB_REF#refs/tags/}
|
||||
docker tag quay.io/krkn-chaos/krkn quay.io/redhat-chaos/krkn
|
||||
docker tag quay.io/krkn-chaos/krkn quay.io/krkn-chaos/krkn:${GITHUB_REF#refs/tags/}
|
||||
docker tag quay.io/krkn-chaos/krkn quay.io/redhat-chaos/krkn:${GITHUB_REF#refs/tags/}
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Test Build the Docker images
|
||||
if: ${{ github.event_name == 'pull_request' }}
|
||||
if: github.event_name == 'pull_request'
|
||||
run: |
|
||||
./containers/compile_dockerfile.sh
|
||||
docker build --no-cache -t quay.io/krkn-chaos/krkn containers/ --build-arg PR_NUMBER=${{ github.event.pull_request.number }}
|
||||
- name: Login in quay
|
||||
docker buildx build --no-cache \
|
||||
--platform linux/${{ matrix.platform }} \
|
||||
-t quay.io/krkn-chaos/krkn \
|
||||
-t quay.io/redhat-chaos/krkn \
|
||||
containers/ \
|
||||
--build-arg PR_NUMBER=${{ github.event.pull_request.number }}
|
||||
|
||||
- name: Login to krkn-chaos quay
|
||||
if: startsWith(github.ref, 'refs/tags')
|
||||
run: docker login quay.io -u ${QUAY_USER} -p ${QUAY_TOKEN}
|
||||
env:
|
||||
QUAY_USER: ${{ secrets.QUAY_USERNAME }}
|
||||
QUAY_TOKEN: ${{ secrets.QUAY_PASSWORD }}
|
||||
- name: Push the KrknChaos Docker images
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: quay.io
|
||||
username: ${{ secrets.QUAY_USERNAME }}
|
||||
password: ${{ secrets.QUAY_PASSWORD }}
|
||||
|
||||
- name: Build and push krkn-chaos images
|
||||
if: startsWith(github.ref, 'refs/tags')
|
||||
run: |
|
||||
docker push quay.io/krkn-chaos/krkn
|
||||
docker push quay.io/krkn-chaos/krkn:${GITHUB_REF#refs/tags/}
|
||||
- name: Login in to redhat-chaos quay
|
||||
if: startsWith(github.ref, 'refs/tags/v')
|
||||
run: docker login quay.io -u ${QUAY_USER} -p ${QUAY_TOKEN}
|
||||
env:
|
||||
QUAY_USER: ${{ secrets.QUAY_USER_1 }}
|
||||
QUAY_TOKEN: ${{ secrets.QUAY_TOKEN_1 }}
|
||||
- name: Push the RedHat Chaos Docker images
|
||||
./containers/compile_dockerfile.sh
|
||||
TAG=${GITHUB_REF#refs/tags/}
|
||||
docker buildx build --no-cache \
|
||||
--platform linux/${{ matrix.platform }} \
|
||||
--provenance=false \
|
||||
-t quay.io/krkn-chaos/krkn:latest-${{ matrix.platform }} \
|
||||
-t quay.io/krkn-chaos/krkn:${TAG}-${{ matrix.platform }} \
|
||||
containers/ \
|
||||
--build-arg TAG=${TAG} \
|
||||
--push --load
|
||||
|
||||
- name: Login to redhat-chaos quay
|
||||
if: startsWith(github.ref, 'refs/tags')
|
||||
run: |
|
||||
docker push quay.io/redhat-chaos/krkn
|
||||
docker push quay.io/redhat-chaos/krkn:${GITHUB_REF#refs/tags/}
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: quay.io
|
||||
username: ${{ secrets.QUAY_USER_1 }}
|
||||
password: ${{ secrets.QUAY_TOKEN_1 }}
|
||||
|
||||
- name: Push redhat-chaos images
|
||||
if: startsWith(github.ref, 'refs/tags')
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/tags/}
|
||||
docker tag quay.io/krkn-chaos/krkn:${TAG}-${{ matrix.platform }} quay.io/redhat-chaos/krkn:${TAG}-${{ matrix.platform }}
|
||||
docker tag quay.io/krkn-chaos/krkn:${TAG}-${{ matrix.platform }} quay.io/redhat-chaos/krkn:latest-${{ matrix.platform }}
|
||||
docker push quay.io/redhat-chaos/krkn:${TAG}-${{ matrix.platform }}
|
||||
docker push quay.io/redhat-chaos/krkn:latest-${{ matrix.platform }}
|
||||
|
||||
manifest:
|
||||
runs-on: ubuntu-latest
|
||||
needs: build
|
||||
if: startsWith(github.ref, 'refs/tags')
|
||||
steps:
|
||||
- name: Login to krkn-chaos quay
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: quay.io
|
||||
username: ${{ secrets.QUAY_USERNAME }}
|
||||
password: ${{ secrets.QUAY_PASSWORD }}
|
||||
|
||||
- name: Create and push KrknChaos manifests
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/tags/}
|
||||
docker manifest create quay.io/krkn-chaos/krkn:${TAG} \
|
||||
quay.io/krkn-chaos/krkn:${TAG}-amd64 \
|
||||
quay.io/krkn-chaos/krkn:${TAG}-arm64
|
||||
docker manifest push quay.io/krkn-chaos/krkn:${TAG}
|
||||
|
||||
docker manifest create quay.io/krkn-chaos/krkn:latest \
|
||||
quay.io/krkn-chaos/krkn:latest-amd64 \
|
||||
quay.io/krkn-chaos/krkn:latest-arm64
|
||||
docker manifest push quay.io/krkn-chaos/krkn:latest
|
||||
|
||||
- name: Login to redhat-chaos quay
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: quay.io
|
||||
username: ${{ secrets.QUAY_USER_1 }}
|
||||
password: ${{ secrets.QUAY_TOKEN_1 }}
|
||||
|
||||
- name: Create and push RedHat Chaos manifests
|
||||
run: |
|
||||
TAG=${GITHUB_REF#refs/tags/}
|
||||
docker manifest create quay.io/redhat-chaos/krkn:${TAG} \
|
||||
quay.io/redhat-chaos/krkn:${TAG}-amd64 \
|
||||
quay.io/redhat-chaos/krkn:${TAG}-arm64
|
||||
docker manifest push quay.io/redhat-chaos/krkn:${TAG}
|
||||
|
||||
docker manifest create quay.io/redhat-chaos/krkn:latest \
|
||||
quay.io/redhat-chaos/krkn:latest-amd64 \
|
||||
quay.io/redhat-chaos/krkn:latest-arm64
|
||||
docker manifest push quay.io/redhat-chaos/krkn:latest
|
||||
|
||||
- name: Rebuild krkn-hub
|
||||
if: startsWith(github.ref, 'refs/tags')
|
||||
uses: redhat-chaos/actions/krkn-hub@main
|
||||
with:
|
||||
QUAY_USER: ${{ secrets.QUAY_USERNAME }}
|
||||
|
||||
58
.github/workflows/require-docs.yml
vendored
58
.github/workflows/require-docs.yml
vendored
@@ -9,37 +9,47 @@ jobs:
|
||||
name: Check Documentation Update
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Check if Documentation is Required
|
||||
id: check_docs
|
||||
run: |
|
||||
echo "Checking PR body for documentation checkbox..."
|
||||
# Read the PR body from the GitHub event payload
|
||||
if echo "${{ github.event.pull_request.body }}" | grep -qi '\[x\].*documentation needed'; then
|
||||
# Read PR body from the event JSON file — never from shell interpolation.
|
||||
# jq handles all escaping; the shell never sees the user-controlled string.
|
||||
if jq -r '.pull_request.body // ""' "$GITHUB_EVENT_PATH" | \
|
||||
grep -qi '\[x\].*documentation needed'; then
|
||||
echo "Documentation required detected."
|
||||
echo "docs_required=true" >> $GITHUB_OUTPUT
|
||||
echo "docs_required=true" >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
echo "Documentation not required."
|
||||
echo "docs_required=false" >> $GITHUB_OUTPUT
|
||||
echo "docs_required=false" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
- name: Enforce Documentation Update (if required)
|
||||
if: steps.check_docs.outputs.docs_required == 'true'
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
# Retrieve feature branch and repository owner from the GitHub context
|
||||
FEATURE_BRANCH="${{ github.head_ref }}"
|
||||
REPO_OWNER="${{ github.repository_owner }}"
|
||||
WEBSITE_REPO="website"
|
||||
echo "Searching for a merged documentation PR for feature branch: $FEATURE_BRANCH in $REPO_OWNER/$WEBSITE_REPO..."
|
||||
MERGED_PR=$(gh pr list --repo "$REPO_OWNER/$WEBSITE_REPO" --state merged --json headRefName,title,url | jq -r \
|
||||
--arg FEATURE_BRANCH "$FEATURE_BRANCH" '.[] | select(.title | contains($FEATURE_BRANCH)) | .url')
|
||||
if [[ -z "$MERGED_PR" ]]; then
|
||||
echo ":x: Documentation PR for branch '$FEATURE_BRANCH' is required and has not been merged."
|
||||
exit 1
|
||||
else
|
||||
echo ":white_check_mark: Found merged documentation PR: $MERGED_PR"
|
||||
fi
|
||||
uses: actions/github-script@v7
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
const featureBranch = context.payload.pull_request.head.ref;
|
||||
const repoOwner = context.repo.owner;
|
||||
const websiteRepo = 'website';
|
||||
|
||||
core.info(`Searching for a merged documentation PR for feature branch: ${featureBranch} in ${repoOwner}/${websiteRepo}...`);
|
||||
|
||||
const { data: pulls } = await github.rest.pulls.list({
|
||||
owner: repoOwner,
|
||||
repo: websiteRepo,
|
||||
state: 'closed',
|
||||
per_page: 100,
|
||||
});
|
||||
|
||||
const mergedPr = pulls.find(
|
||||
(pr) => pr.merged_at && pr.title.includes(featureBranch)
|
||||
);
|
||||
|
||||
if (!mergedPr) {
|
||||
core.setFailed(
|
||||
`❌ Documentation PR for branch '${featureBranch}' is required and has not been merged.`
|
||||
);
|
||||
} else {
|
||||
core.info(`✅ Found merged documentation PR: ${mergedPr.html_url}`);
|
||||
}
|
||||
141
BETA_FEATURE_POLICY.md
Normal file
141
BETA_FEATURE_POLICY.md
Normal file
@@ -0,0 +1,141 @@
|
||||
# Beta Features Policy
|
||||
|
||||
## Overview
|
||||
|
||||
Beta features provide users early access to new capabilities before they reach full stability and general availability (GA). These features allow maintainers to gather feedback, validate usability, and improve functionality based on real-world usage.
|
||||
|
||||
Beta features are intended for experimentation and evaluation. While they are functional, they may not yet meet the stability, performance, or backward compatibility guarantees expected from generally available features.
|
||||
|
||||
---
|
||||
|
||||
## What is a Beta Feature
|
||||
|
||||
A **Beta feature** is a feature that is released for user evaluation but is still under active development and refinement.
|
||||
|
||||
Beta features may have the following characteristics:
|
||||
|
||||
- Functionally usable but still evolving
|
||||
- APIs or behavior may change between releases
|
||||
- Performance optimizations may still be in progress
|
||||
- Documentation may be limited or evolving
|
||||
- Edge cases may not be fully validated
|
||||
|
||||
Beta features should be considered **experimental and optional**.
|
||||
|
||||
---
|
||||
|
||||
## User Expectations
|
||||
|
||||
Users trying Beta features should understand the following:
|
||||
|
||||
- Stability is not guaranteed
|
||||
- APIs and functionality may change without notice
|
||||
- Backward compatibility is not guaranteed
|
||||
- The feature may evolve significantly before GA
|
||||
- Production use should be evaluated carefully
|
||||
|
||||
We strongly encourage users to provide feedback to help improve the feature before it becomes generally available.
|
||||
|
||||
---
|
||||
|
||||
## Beta Feature Identification
|
||||
|
||||
All Beta features are clearly identified to ensure transparency.
|
||||
|
||||
### In Release Notes
|
||||
|
||||
Beta features will be marked with a **[BETA]** tag.
|
||||
|
||||
Example: [BETA] Krkn Resilience Score
|
||||
|
||||
|
||||
### In Documentation
|
||||
|
||||
Beta features will include a notice similar to:
|
||||
|
||||
> **Beta Feature**
|
||||
> This feature is currently in Beta and is intended for early user feedback. Behavior, APIs, and stability may change in future releases.
|
||||
|
||||
---
|
||||
|
||||
## Feature Lifecycle
|
||||
|
||||
Features typically progress through the following lifecycle stages.
|
||||
|
||||
### 1. Development
|
||||
The feature is under active development and may not yet be visible to users.
|
||||
|
||||
### 2. Beta
|
||||
The feature is released for early adoption and feedback.
|
||||
|
||||
Characteristics:
|
||||
|
||||
- Feature is usable
|
||||
- Feedback is encouraged
|
||||
- Stability improvements are ongoing
|
||||
|
||||
### 3. Stabilization
|
||||
Based on user feedback and testing, the feature is improved to meet stability and usability expectations.
|
||||
|
||||
### 4. General Availability (GA)
|
||||
|
||||
The feature is considered stable and production-ready.
|
||||
|
||||
GA features provide:
|
||||
|
||||
- Stable APIs
|
||||
- Backward compatibility guarantees
|
||||
- Complete documentation
|
||||
- Full CI test coverage
|
||||
|
||||
---
|
||||
|
||||
## Promotion to General Availability
|
||||
|
||||
A Beta feature may be promoted to GA once the following criteria are met:
|
||||
|
||||
- Critical bugs are resolved
|
||||
- Feature stability has improved through testing
|
||||
- APIs and behavior are stable
|
||||
- Documentation is complete
|
||||
- Community feedback has been incorporated
|
||||
|
||||
The promotion will be announced in the release notes.
|
||||
|
||||
Example: Feature promoted from Beta to GA
|
||||
|
||||
|
||||
---
|
||||
|
||||
## Deprecation of Beta Features
|
||||
|
||||
In some cases, a Beta feature may be redesigned or discontinued.
|
||||
|
||||
If this happens:
|
||||
|
||||
- The feature will be marked as **Deprecated**
|
||||
- A removal timeline will be provided
|
||||
- Alternative approaches will be documented when possible
|
||||
|
||||
Example: [DEPRECATED] This feature will be removed in a future release.
|
||||
|
||||
---
|
||||
|
||||
## Contributing Feedback
|
||||
User feedback plays a critical role in improving Beta features.
|
||||
|
||||
Users are encouraged to report:
|
||||
|
||||
- Bugs
|
||||
- Usability issues
|
||||
- Performance concerns
|
||||
- Feature suggestions
|
||||
|
||||
Feedback can be submitted through:
|
||||
|
||||
- Krkn GitHub Issues
|
||||
- Krkn GitHub Discussions
|
||||
- Krkn Community channels
|
||||
|
||||
Please include **Beta feature context** when reporting issues.
|
||||
Your feedback helps guide the roadmap and ensures features are production-ready before GA.
|
||||
@@ -55,6 +55,10 @@ kraken:
|
||||
- kubevirt_vm_outage:
|
||||
- scenarios/kubevirt/kubevirt-vm-outage.yaml
|
||||
|
||||
resiliency:
|
||||
resiliency_run_mode: standalone # Options: standalone, controller, disabled
|
||||
resiliency_file: config/alerts.yaml # Path to SLO definitions, will resolve to performance_monitoring: alert_profile: if not specified
|
||||
|
||||
cerberus:
|
||||
cerberus_enabled: False # Enable it when cerberus is previously installed
|
||||
cerberus_url: # When cerberus_enabled is set to True, provide the url where cerberus publishes go/no-go signal
|
||||
|
||||
@@ -163,6 +163,15 @@
|
||||
"default": "False",
|
||||
"required": "false"
|
||||
},
|
||||
{
|
||||
"name": "es-run-tag",
|
||||
"short_description": "Elasticsearch run tag",
|
||||
"description": "Elasticsearch run tag to compare similar runs",
|
||||
"variable": "ES_RUN_TAG",
|
||||
"type": "string",
|
||||
"default": "",
|
||||
"required": "false"
|
||||
},
|
||||
{
|
||||
"name": "es-server",
|
||||
"short_description": "Elasticsearch instance URL",
|
||||
@@ -549,5 +558,31 @@
|
||||
"separator": ",",
|
||||
"default": "False",
|
||||
"required": "false"
|
||||
},
|
||||
{
|
||||
"name": "resiliency-score",
|
||||
"short_description": "Enable resiliency score calculation",
|
||||
"description": "The system outputs a detailed resiliency score as a single-line JSON object, facilitating easy aggregation across multiple test scenarios.",
|
||||
"variable": "RESILIENCY_SCORE",
|
||||
"type": "boolean",
|
||||
"required": "false"
|
||||
},
|
||||
{
|
||||
"name": "disable-resiliency-score",
|
||||
"short_description": "Disable resiliency score calculation",
|
||||
"description": "Disable resiliency score calculation",
|
||||
"variable": "DISABLE_RESILIENCY_SCORE",
|
||||
"type": "boolean",
|
||||
"required": "false"
|
||||
},
|
||||
{
|
||||
"name": "resiliency-file",
|
||||
"short_description": "Resiliency Score metrics file",
|
||||
"description": "Custom Resiliency score file",
|
||||
"variable": "RESILIENCY_FILE",
|
||||
"type": "file",
|
||||
"required": "false",
|
||||
"mount_path": "/home/krkn/resiliency-file.yaml"
|
||||
}
|
||||
|
||||
]
|
||||
@@ -320,7 +320,7 @@ class Resiliency:
|
||||
)
|
||||
detailed = self.get_detailed_report()
|
||||
|
||||
if run_mode == "controller":
|
||||
if run_mode == "detailed":
|
||||
# krknctl expects the detailed report on stdout in a special format
|
||||
try:
|
||||
detailed_json = json.dumps(detailed)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
duration: 60
|
||||
duration: 10
|
||||
workers: '' # leave it empty '' node cpu auto-detection
|
||||
hog-type: cpu
|
||||
image: quay.io/krkn-chaos/krkn-hog
|
||||
|
||||
401
tests/test_prometheus_collector.py
Normal file
401
tests/test_prometheus_collector.py
Normal file
@@ -0,0 +1,401 @@
|
||||
"""
|
||||
Tests for krkn.prometheus.collector module.
|
||||
|
||||
How to run these tests:
|
||||
|
||||
# Run all tests in this file
|
||||
python -m unittest tests.test_prometheus_collector
|
||||
|
||||
# Run all tests with verbose output
|
||||
python -m unittest tests.test_prometheus_collector -v
|
||||
|
||||
# Run a specific test class
|
||||
python -m unittest tests.test_prometheus_collector.TestSLOPassed
|
||||
python -m unittest tests.test_prometheus_collector.TestEvaluateSLOs
|
||||
|
||||
# Run a specific test method
|
||||
python -m unittest tests.test_prometheus_collector.TestSLOPassed.test_empty_result_returns_none
|
||||
python -m unittest tests.test_prometheus_collector.TestEvaluateSLOs.test_evaluate_single_slo_passing
|
||||
|
||||
# Run with coverage
|
||||
python -m coverage run -m unittest tests.test_prometheus_collector
|
||||
python -m coverage report -m
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import unittest
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
|
||||
from krkn.prometheus.collector import slo_passed, evaluate_slos
|
||||
|
||||
|
||||
class TestSLOPassed(unittest.TestCase):
|
||||
"""Test cases for the slo_passed function."""
|
||||
|
||||
def test_empty_result_returns_none(self):
|
||||
"""Test that an empty result list returns None."""
|
||||
result = slo_passed([])
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_result_with_values_all_zero_returns_true(self):
|
||||
"""Test that all zero values in 'values' returns True."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0"],
|
||||
[1234567891, "0"],
|
||||
[1234567892, "0"],
|
||||
]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_result_with_values_containing_nonzero_returns_false(self):
|
||||
"""Test that any non-zero value in 'values' returns False."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0"],
|
||||
[1234567891, "1.5"], # Non-zero value
|
||||
[1234567892, "0"],
|
||||
]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_result_with_single_value_zero_returns_true(self):
|
||||
"""Test that a single 'value' field with zero returns True."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"value": [1234567890, "0"]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_result_with_single_value_nonzero_returns_false(self):
|
||||
"""Test that a single 'value' field with non-zero returns False."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"value": [1234567890, "5.2"]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_result_with_no_samples_returns_none(self):
|
||||
"""Test that result with no 'values' or 'value' keys returns None."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"metric": {"job": "test"}
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_result_with_invalid_value_type_in_values(self):
|
||||
"""Test handling of invalid value types in 'values' field."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "invalid"], # Will raise ValueError
|
||||
[1234567891, "0"],
|
||||
]
|
||||
}
|
||||
]
|
||||
# Should continue processing after ValueError and find the zero
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_result_with_invalid_value_in_single_value_returns_false(self):
|
||||
"""Test that invalid value type in 'value' field returns False."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"value": [1234567890, "invalid"]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_result_with_none_value_in_values(self):
|
||||
"""Test handling of None values in 'values' field."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, None], # Will raise TypeError
|
||||
[1234567891, "0"],
|
||||
]
|
||||
}
|
||||
]
|
||||
# Should continue processing after TypeError and find the zero
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_result_with_multiple_series_first_has_nonzero(self):
|
||||
"""Test that first non-zero value in any series returns False immediately."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0"],
|
||||
[1234567891, "2.0"], # Non-zero in first series
|
||||
]
|
||||
},
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0"],
|
||||
[1234567891, "0"],
|
||||
]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_result_with_float_zero(self):
|
||||
"""Test that float zero is handled correctly."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0.0"],
|
||||
[1234567891, "0.00"],
|
||||
]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_result_with_scientific_notation(self):
|
||||
"""Test values in scientific notation."""
|
||||
prometheus_result = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0e0"],
|
||||
[1234567891, "1e-10"], # Very small but non-zero
|
||||
]
|
||||
}
|
||||
]
|
||||
result = slo_passed(prometheus_result)
|
||||
self.assertFalse(result)
|
||||
|
||||
|
||||
class TestEvaluateSLOs(unittest.TestCase):
|
||||
"""Test cases for the evaluate_slos function."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
self.mock_prom_cli = Mock()
|
||||
self.start_time = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
self.end_time = datetime.datetime(2025, 1, 1, 1, 0, 0)
|
||||
|
||||
def test_evaluate_single_slo_passing(self):
|
||||
"""Test evaluation of a single passing SLO."""
|
||||
slo_list = [
|
||||
{
|
||||
"name": "test_slo",
|
||||
"expr": "sum(rate(http_requests_total[5m]))"
|
||||
}
|
||||
]
|
||||
|
||||
# Mock the Prometheus response with all zeros (passing)
|
||||
self.mock_prom_cli.process_prom_query_in_range.return_value = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0"],
|
||||
[1234567891, "0"],
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
results = evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
self.assertEqual(results["test_slo"], True)
|
||||
self.mock_prom_cli.process_prom_query_in_range.assert_called_once_with(
|
||||
"sum(rate(http_requests_total[5m]))",
|
||||
start_time=self.start_time,
|
||||
end_time=self.end_time,
|
||||
)
|
||||
|
||||
def test_evaluate_single_slo_failing(self):
|
||||
"""Test evaluation of a single failing SLO."""
|
||||
slo_list = [
|
||||
{
|
||||
"name": "test_slo",
|
||||
"expr": "sum(rate(errors[5m]))"
|
||||
}
|
||||
]
|
||||
|
||||
# Mock the Prometheus response with non-zero value (failing)
|
||||
self.mock_prom_cli.process_prom_query_in_range.return_value = [
|
||||
{
|
||||
"values": [
|
||||
[1234567890, "0"],
|
||||
[1234567891, "5"], # Non-zero indicates failure
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
results = evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
self.assertEqual(results["test_slo"], False)
|
||||
|
||||
def test_evaluate_slo_with_no_data_returns_true(self):
|
||||
"""Test that SLO with no data (None) is treated as passing."""
|
||||
slo_list = [
|
||||
{
|
||||
"name": "test_slo",
|
||||
"expr": "absent(metric)"
|
||||
}
|
||||
]
|
||||
|
||||
# Mock the Prometheus response with no samples
|
||||
self.mock_prom_cli.process_prom_query_in_range.return_value = []
|
||||
|
||||
results = evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
# No data should be treated as passing
|
||||
self.assertEqual(results["test_slo"], True)
|
||||
|
||||
def test_evaluate_slo_query_exception_returns_false(self):
|
||||
"""Test that an exception during query results in False."""
|
||||
slo_list = [
|
||||
{
|
||||
"name": "test_slo",
|
||||
"expr": "invalid_query"
|
||||
}
|
||||
]
|
||||
|
||||
# Mock the Prometheus client to raise an exception
|
||||
self.mock_prom_cli.process_prom_query_in_range.side_effect = Exception("Query failed")
|
||||
|
||||
with patch('krkn.prometheus.collector.logging') as mock_logging:
|
||||
results = evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
# Exception should result in False
|
||||
self.assertEqual(results["test_slo"], False)
|
||||
mock_logging.error.assert_called_once()
|
||||
|
||||
def test_evaluate_multiple_slos(self):
|
||||
"""Test evaluation of multiple SLOs with mixed results."""
|
||||
slo_list = [
|
||||
{
|
||||
"name": "slo_pass",
|
||||
"expr": "query1"
|
||||
},
|
||||
{
|
||||
"name": "slo_fail",
|
||||
"expr": "query2"
|
||||
},
|
||||
{
|
||||
"name": "slo_no_data",
|
||||
"expr": "query3"
|
||||
}
|
||||
]
|
||||
|
||||
# Mock different responses for each query
|
||||
def mock_query_side_effect(expr, start_time, end_time):
|
||||
if expr == "query1":
|
||||
return [{"values": [[1234567890, "0"]]}]
|
||||
elif expr == "query2":
|
||||
return [{"values": [[1234567890, "1"]]}]
|
||||
else: # query3
|
||||
return []
|
||||
|
||||
self.mock_prom_cli.process_prom_query_in_range.side_effect = mock_query_side_effect
|
||||
|
||||
results = evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
self.assertEqual(results["slo_pass"], True)
|
||||
self.assertEqual(results["slo_fail"], False)
|
||||
self.assertEqual(results["slo_no_data"], True)
|
||||
self.assertEqual(len(results), 3)
|
||||
|
||||
def test_evaluate_empty_slo_list(self):
|
||||
"""Test evaluation with an empty SLO list."""
|
||||
slo_list = []
|
||||
|
||||
results = evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
self.assertEqual(results, {})
|
||||
self.mock_prom_cli.process_prom_query_in_range.assert_not_called()
|
||||
|
||||
@patch('krkn.prometheus.collector.logging')
|
||||
def test_evaluate_slos_logs_info_message(self, mock_logging):
|
||||
"""Test that evaluation logs an info message with SLO count."""
|
||||
slo_list = [
|
||||
{"name": "slo1", "expr": "query1"},
|
||||
{"name": "slo2", "expr": "query2"},
|
||||
]
|
||||
|
||||
self.mock_prom_cli.process_prom_query_in_range.return_value = [
|
||||
{"values": [[1234567890, "0"]]}
|
||||
]
|
||||
|
||||
evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
# Check that info logging was called with the expected message
|
||||
mock_logging.info.assert_called_once()
|
||||
call_args = mock_logging.info.call_args[0]
|
||||
self.assertIn("Evaluating %d SLOs", call_args[0])
|
||||
self.assertEqual(call_args[1], 2)
|
||||
|
||||
@patch('krkn.prometheus.collector.logging')
|
||||
def test_evaluate_slos_logs_debug_for_no_data(self, mock_logging):
|
||||
"""Test that no data scenario logs a debug message."""
|
||||
slo_list = [
|
||||
{"name": "test_slo", "expr": "query"}
|
||||
]
|
||||
|
||||
self.mock_prom_cli.process_prom_query_in_range.return_value = []
|
||||
|
||||
evaluate_slos(
|
||||
self.mock_prom_cli,
|
||||
slo_list,
|
||||
self.start_time,
|
||||
self.end_time
|
||||
)
|
||||
|
||||
# Check that debug logging was called
|
||||
mock_logging.debug.assert_called_once()
|
||||
call_args = mock_logging.debug.call_args[0]
|
||||
self.assertIn("no data", call_args[0])
|
||||
self.assertIn("test_slo", call_args[1])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
624
tests/test_resiliency.py
Normal file
624
tests/test_resiliency.py
Normal file
@@ -0,0 +1,624 @@
|
||||
"""
|
||||
Tests for krkn.resiliency.resiliency module.
|
||||
|
||||
How to run these tests:
|
||||
|
||||
# Run all tests in this file
|
||||
python -m unittest tests.test_resiliency
|
||||
|
||||
# Run all tests with verbose output
|
||||
python -m unittest tests.test_resiliency -v
|
||||
|
||||
# Run a specific test class
|
||||
python -m unittest tests.test_resiliency.TestResiliencyInit
|
||||
python -m unittest tests.test_resiliency.TestResiliencyCalculateScore
|
||||
python -m unittest tests.test_resiliency.TestResiliencyScenarioReports
|
||||
|
||||
# Run a specific test method
|
||||
python -m unittest tests.test_resiliency.TestResiliencyInit.test_init_from_file
|
||||
python -m unittest tests.test_resiliency.TestResiliencyScenarioReports.test_add_scenario_report
|
||||
|
||||
# Run with coverage
|
||||
python -m coverage run -m unittest tests.test_resiliency
|
||||
python -m coverage report -m
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from krkn.resiliency.resiliency import Resiliency
|
||||
|
||||
|
||||
class TestResiliencyInit(unittest.TestCase):
|
||||
"""Test cases for Resiliency class initialization."""
|
||||
|
||||
def test_init_from_file(self):
|
||||
"""Test initialization from alerts.yaml file."""
|
||||
alerts_data = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "Instance down"},
|
||||
{"expr": "cpu > 80", "severity": "warning", "description": "High CPU"},
|
||||
]
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump(alerts_data, f)
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
res = Resiliency(alerts_yaml_path=temp_file)
|
||||
self.assertEqual(len(res._slos), 2)
|
||||
self.assertEqual(res._slos[0]["name"], "Instance down")
|
||||
self.assertEqual(res._slos[0]["expr"], "up == 0")
|
||||
self.assertEqual(res._slos[0]["severity"], "critical")
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
def test_init_from_file_not_found_raises_error(self):
|
||||
"""Test that missing alerts file raises FileNotFoundError."""
|
||||
with self.assertRaises(FileNotFoundError):
|
||||
Resiliency(alerts_yaml_path="/nonexistent/path.yaml")
|
||||
|
||||
def test_init_preserves_custom_weight_on_slo(self):
|
||||
"""Test that custom weight is preserved from the alerts file."""
|
||||
alerts_data = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "slo1", "weight": 10},
|
||||
]
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump(alerts_data, f)
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
res = Resiliency(alerts_yaml_path=temp_file)
|
||||
self.assertEqual(res._slos[0]["weight"], 10)
|
||||
finally:
|
||||
os.unlink(temp_file)
|
||||
|
||||
def test_normalise_alerts_with_valid_data(self):
|
||||
"""Test _normalise_alerts with valid alert data."""
|
||||
raw_alerts = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "Down"},
|
||||
{"expr": "cpu > 80", "severity": "warning", "description": "High CPU"},
|
||||
]
|
||||
|
||||
normalized = Resiliency._normalise_alerts(raw_alerts)
|
||||
|
||||
self.assertEqual(len(normalized), 2)
|
||||
self.assertEqual(normalized[0]["name"], "Down")
|
||||
self.assertEqual(normalized[1]["name"], "High CPU")
|
||||
|
||||
def test_normalise_alerts_without_description_uses_index(self):
|
||||
"""Test _normalise_alerts uses index as name when description missing."""
|
||||
raw_alerts = [
|
||||
{"expr": "up == 0", "severity": "critical"},
|
||||
]
|
||||
|
||||
normalized = Resiliency._normalise_alerts(raw_alerts)
|
||||
|
||||
self.assertEqual(normalized[0]["name"], "slo_0")
|
||||
|
||||
def test_normalise_alerts_skips_invalid_entries(self):
|
||||
"""Test _normalise_alerts skips entries missing required fields."""
|
||||
raw_alerts = [
|
||||
{"expr": "up == 0", "severity": "critical"}, # Valid
|
||||
{"severity": "warning"}, # Missing expr
|
||||
{"expr": "cpu > 80"}, # Missing severity
|
||||
"invalid", # Not a dict
|
||||
]
|
||||
|
||||
with patch('krkn.resiliency.resiliency.logging') as mock_logging:
|
||||
normalized = Resiliency._normalise_alerts(raw_alerts)
|
||||
|
||||
self.assertEqual(len(normalized), 1)
|
||||
self.assertEqual(mock_logging.warning.call_count, 3)
|
||||
|
||||
def test_normalise_alerts_with_non_list_raises_error(self):
|
||||
"""Test _normalise_alerts raises ValueError for non-list input."""
|
||||
with self.assertRaises(ValueError):
|
||||
Resiliency._normalise_alerts("not a list")
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
Resiliency._normalise_alerts({"key": "value"})
|
||||
|
||||
def test_normalise_alerts_stores_weight_none_when_absent(self):
|
||||
"""Test that alerts without a weight field store None, not 0, preserving severity fallback."""
|
||||
raw_alerts = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "no weight"},
|
||||
]
|
||||
|
||||
normalized = Resiliency._normalise_alerts(raw_alerts)
|
||||
|
||||
self.assertIsNone(normalized[0]["weight"])
|
||||
|
||||
def test_normalise_alerts_stores_custom_weight_when_present(self):
|
||||
"""Test that a numeric weight field is preserved exactly."""
|
||||
raw_alerts = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "slo1", "weight": 10},
|
||||
{"expr": "cpu > 80", "severity": "warning", "description": "slo2", "weight": 0.5},
|
||||
]
|
||||
|
||||
normalized = Resiliency._normalise_alerts(raw_alerts)
|
||||
|
||||
self.assertEqual(normalized[0]["weight"], 10)
|
||||
self.assertEqual(normalized[1]["weight"], 0.5)
|
||||
|
||||
|
||||
class TestResiliencyCalculateScore(unittest.TestCase):
|
||||
"""Test cases for calculate_score method."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
alerts_data = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "slo1"},
|
||||
{"expr": "cpu > 80", "severity": "warning", "description": "slo2"},
|
||||
]
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump(alerts_data, f)
|
||||
self.temp_file = f.name
|
||||
|
||||
self.res = Resiliency(alerts_yaml_path=self.temp_file)
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up temp files."""
|
||||
if os.path.exists(self.temp_file):
|
||||
os.unlink(self.temp_file)
|
||||
|
||||
def test_calculate_score_with_all_passing(self):
|
||||
"""Test calculate_score with all SLOs passing."""
|
||||
self.res._results = {"slo1": True, "slo2": True}
|
||||
score = self.res.calculate_score()
|
||||
|
||||
self.assertEqual(score, 100)
|
||||
self.assertEqual(self.res._score, 100)
|
||||
|
||||
def test_calculate_score_with_failures(self):
|
||||
"""Test calculate_score with some failures."""
|
||||
self.res._results = {"slo1": False, "slo2": True}
|
||||
score = self.res.calculate_score()
|
||||
|
||||
# slo1 is critical (3 pts lost), slo2 is warning (1 pt)
|
||||
# Total: 4 pts, Lost: 3 pts -> 25%
|
||||
self.assertEqual(score, 25)
|
||||
|
||||
def test_calculate_score_with_health_checks(self):
|
||||
"""Test calculate_score includes health check results."""
|
||||
self.res._results = {"slo1": True, "slo2": True}
|
||||
health_checks = {"http://service": False} # Critical, 3 pts lost
|
||||
|
||||
score = self.res.calculate_score(health_check_results=health_checks)
|
||||
|
||||
# Total: 3 + 1 + 3 = 7 pts, Lost: 3 pts -> ~57%
|
||||
self.assertEqual(score, 57)
|
||||
self.assertEqual(self.res._health_check_results, health_checks)
|
||||
|
||||
def test_calculate_score_uses_per_slo_custom_weight_from_yaml(self):
|
||||
"""Integration: per-SLO custom weight loaded from YAML is used in scoring."""
|
||||
alerts_data = [
|
||||
{"expr": "up == 0", "severity": "critical", "description": "high", "weight": 10},
|
||||
{"expr": "cpu > 80", "severity": "warning", "description": "low", "weight": 0.5},
|
||||
]
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump(alerts_data, f)
|
||||
temp = f.name
|
||||
|
||||
try:
|
||||
res = Resiliency(alerts_yaml_path=temp)
|
||||
# "high" passes (10 pts), "low" fails (loses 0.5 pts)
|
||||
res._results = {"high": True, "low": False}
|
||||
score = res.calculate_score()
|
||||
|
||||
# Total: 10.5, Lost: 0.5 -> 95%
|
||||
self.assertEqual(score, 95)
|
||||
self.assertEqual(res._breakdown["total_points"], 10.5)
|
||||
self.assertEqual(res._breakdown["points_lost"], 0.5)
|
||||
finally:
|
||||
os.unlink(temp)
|
||||
|
||||
def test_calculate_score_stores_breakdown(self):
|
||||
"""Test that calculate_score stores the breakdown dict."""
|
||||
self.res._results = {"slo1": True, "slo2": False}
|
||||
self.res.calculate_score()
|
||||
|
||||
self.assertIsNotNone(self.res._breakdown)
|
||||
self.assertIn("passed", self.res._breakdown)
|
||||
self.assertIn("failed", self.res._breakdown)
|
||||
self.assertIn("total_points", self.res._breakdown)
|
||||
self.assertIn("points_lost", self.res._breakdown)
|
||||
|
||||
|
||||
class TestResiliencyToDict(unittest.TestCase):
|
||||
"""Test cases for to_dict method."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump([{"expr": "test", "severity": "critical"}], f)
|
||||
self.temp_file = f.name
|
||||
|
||||
self.res = Resiliency(alerts_yaml_path=self.temp_file)
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up temp files."""
|
||||
if os.path.exists(self.temp_file):
|
||||
os.unlink(self.temp_file)
|
||||
|
||||
def test_to_dict_before_calculate_raises_error(self):
|
||||
"""Test that to_dict raises error if calculate_score not called."""
|
||||
with self.assertRaises(RuntimeError):
|
||||
self.res.to_dict()
|
||||
|
||||
def test_to_dict_returns_complete_data(self):
|
||||
"""Test that to_dict returns all expected fields."""
|
||||
self.res._results = {"slo_0": True}
|
||||
health_checks = {"health1": True}
|
||||
self.res.calculate_score(health_check_results=health_checks)
|
||||
|
||||
result = self.res.to_dict()
|
||||
|
||||
self.assertIn("score", result)
|
||||
self.assertIn("breakdown", result)
|
||||
self.assertIn("slo_results", result)
|
||||
self.assertIn("health_check_results", result)
|
||||
self.assertEqual(result["slo_results"], {"slo_0": True})
|
||||
self.assertEqual(result["health_check_results"], health_checks)
|
||||
|
||||
|
||||
class TestResiliencyScenarioReports(unittest.TestCase):
|
||||
"""Test cases for scenario-based resiliency evaluation."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump([
|
||||
{"expr": "up == 0", "severity": "critical", "description": "slo1"}
|
||||
], f)
|
||||
self.temp_file = f.name
|
||||
|
||||
self.res = Resiliency(alerts_yaml_path=self.temp_file)
|
||||
self.mock_prom = Mock()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up temp files."""
|
||||
if os.path.exists(self.temp_file):
|
||||
os.unlink(self.temp_file)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
@patch('krkn.resiliency.resiliency.calculate_resiliency_score')
|
||||
def test_add_scenario_report(self, mock_calc_score, mock_eval_slos):
|
||||
"""Test adding a scenario report."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
mock_calc_score.return_value = (100, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
|
||||
|
||||
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
end = datetime.datetime(2025, 1, 1, 1, 0, 0)
|
||||
|
||||
score = self.res.add_scenario_report(
|
||||
scenario_name="test_scenario",
|
||||
prom_cli=self.mock_prom,
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
weight=1.5,
|
||||
)
|
||||
|
||||
self.assertEqual(score, 100)
|
||||
self.assertEqual(len(self.res.scenario_reports), 1)
|
||||
self.assertEqual(self.res.scenario_reports[0]["name"], "test_scenario")
|
||||
self.assertEqual(self.res.scenario_reports[0]["weight"], 1.5)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
def test_finalize_report_calculates_weighted_average(self, mock_eval_slos):
|
||||
"""Test that finalize_report calculates weighted average correctly."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
|
||||
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
end = datetime.datetime(2025, 1, 1, 2, 0, 0)
|
||||
|
||||
# Add two scenarios with different scores and weights
|
||||
with patch('krkn.resiliency.resiliency.calculate_resiliency_score') as mock_calc:
|
||||
mock_calc.return_value = (80, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
|
||||
self.res.add_scenario_report(
|
||||
scenario_name="scenario1",
|
||||
prom_cli=self.mock_prom,
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
weight=2,
|
||||
)
|
||||
|
||||
mock_calc.return_value = (60, {"passed": 0, "failed": 1, "total_points": 3, "points_lost": 3})
|
||||
self.res.add_scenario_report(
|
||||
scenario_name="scenario2",
|
||||
prom_cli=self.mock_prom,
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
weight=1,
|
||||
)
|
||||
|
||||
with patch('krkn.resiliency.resiliency.calculate_resiliency_score') as mock_calc:
|
||||
mock_calc.return_value = (100, {"passed": 1, "failed": 0})
|
||||
self.res.finalize_report(
|
||||
prom_cli=self.mock_prom,
|
||||
total_start_time=start,
|
||||
total_end_time=end,
|
||||
)
|
||||
|
||||
# Weighted average: (80*2 + 60*1) / (2+1) = 220/3 = 73.33... = 73
|
||||
self.assertEqual(self.res.summary["resiliency_score"], 73)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
def test_finalize_report_populates_summary_and_detailed(self, mock_eval_slos):
|
||||
"""Test that finalize_report sets summary and detailed_report."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
|
||||
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
end = datetime.datetime(2025, 1, 1, 1, 0, 0)
|
||||
|
||||
with patch('krkn.resiliency.resiliency.calculate_resiliency_score') as mock_calc:
|
||||
mock_calc.return_value = (95, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
|
||||
self.res.add_scenario_report(
|
||||
scenario_name="s1",
|
||||
prom_cli=self.mock_prom,
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
)
|
||||
self.res.finalize_report(
|
||||
prom_cli=self.mock_prom,
|
||||
total_start_time=start,
|
||||
total_end_time=end,
|
||||
)
|
||||
|
||||
self.assertIsNotNone(self.res.summary)
|
||||
self.assertIn("resiliency_score", self.res.summary)
|
||||
self.assertIn("scenarios", self.res.summary)
|
||||
self.assertIsNotNone(self.res.detailed_report)
|
||||
self.assertIn("scenarios", self.res.detailed_report)
|
||||
|
||||
def test_finalize_report_without_scenarios_raises_error(self):
|
||||
"""Test that finalize_report raises error if no scenarios added."""
|
||||
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
end = datetime.datetime(2025, 1, 1, 1, 0, 0)
|
||||
|
||||
with self.assertRaises(RuntimeError):
|
||||
self.res.finalize_report(
|
||||
prom_cli=self.mock_prom,
|
||||
total_start_time=start,
|
||||
total_end_time=end,
|
||||
)
|
||||
|
||||
def test_get_summary_before_finalize_raises_error(self):
|
||||
"""Test that get_summary raises RuntimeError before finalize_report is called."""
|
||||
with self.assertRaises(RuntimeError):
|
||||
self.res.get_summary()
|
||||
|
||||
def test_get_detailed_report_before_finalize_raises_error(self):
|
||||
"""Test that get_detailed_report raises RuntimeError before finalize_report is called."""
|
||||
with self.assertRaises(RuntimeError):
|
||||
self.res.get_detailed_report()
|
||||
|
||||
|
||||
class TestResiliencyCompactBreakdown(unittest.TestCase):
|
||||
"""Test cases for compact_breakdown static method."""
|
||||
|
||||
def test_compact_breakdown_with_valid_report(self):
|
||||
"""Test compact_breakdown with valid report structure."""
|
||||
report = {
|
||||
"score": 85,
|
||||
"breakdown": {
|
||||
"passed": 8,
|
||||
"failed": 2,
|
||||
}
|
||||
}
|
||||
|
||||
result = Resiliency.compact_breakdown(report)
|
||||
|
||||
self.assertEqual(result["resiliency_score"], 85)
|
||||
self.assertEqual(result["passed_slos"], 8)
|
||||
self.assertEqual(result["total_slos"], 10)
|
||||
|
||||
def test_compact_breakdown_with_missing_fields_uses_defaults(self):
|
||||
"""Test compact_breakdown handles missing fields gracefully."""
|
||||
report = {}
|
||||
|
||||
result = Resiliency.compact_breakdown(report)
|
||||
|
||||
self.assertEqual(result["resiliency_score"], 0)
|
||||
self.assertEqual(result["passed_slos"], 0)
|
||||
self.assertEqual(result["total_slos"], 0)
|
||||
|
||||
|
||||
class TestResiliencyAddScenarioReports(unittest.TestCase):
|
||||
"""Test cases for the add_scenario_reports method."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump([
|
||||
{"expr": "up == 0", "severity": "critical", "description": "slo1"}
|
||||
], f)
|
||||
self.temp_file = f.name
|
||||
|
||||
self.res = Resiliency(alerts_yaml_path=self.temp_file)
|
||||
self.mock_prom = Mock()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up temp files."""
|
||||
if os.path.exists(self.temp_file):
|
||||
os.unlink(self.temp_file)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
@patch('krkn.resiliency.resiliency.calculate_resiliency_score')
|
||||
def test_add_scenario_reports_enriches_dict_telemetry(self, mock_calc_score, mock_eval_slos):
|
||||
"""Test that dict telemetry items are enriched with a resiliency_report."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
mock_calc_score.return_value = (85, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
|
||||
|
||||
telemetries = [
|
||||
{
|
||||
"scenario": "pod_scenario",
|
||||
"start_timestamp": 1609459200,
|
||||
"end_timestamp": 1609462800,
|
||||
}
|
||||
]
|
||||
|
||||
start = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
end = datetime.datetime(2025, 1, 1, 1, 0, 0)
|
||||
|
||||
self.res.add_scenario_reports(
|
||||
scenario_telemetries=telemetries,
|
||||
prom_cli=self.mock_prom,
|
||||
scenario_type="default_type",
|
||||
batch_start_dt=start,
|
||||
batch_end_dt=end,
|
||||
weight=1.5,
|
||||
)
|
||||
|
||||
self.assertEqual(len(self.res.scenario_reports), 1)
|
||||
self.assertIn("resiliency_report", telemetries[0])
|
||||
self.assertIn("resiliency_score", telemetries[0]["resiliency_report"])
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
@patch('krkn.resiliency.resiliency.calculate_resiliency_score')
|
||||
def test_add_scenario_reports_uses_batch_times_when_timestamps_missing(self, mock_calc_score, mock_eval_slos):
|
||||
"""Test that batch times are used when telemetry has no timestamps."""
|
||||
mock_eval_slos.return_value = {}
|
||||
mock_calc_score.return_value = (0, {"passed": 0, "failed": 0, "total_points": 0, "points_lost": 0})
|
||||
|
||||
telemetries = [{"scenario": "my_scenario"}]
|
||||
start = datetime.datetime(2025, 6, 1, 0, 0, 0)
|
||||
end = datetime.datetime(2025, 6, 1, 1, 0, 0)
|
||||
|
||||
self.res.add_scenario_reports(
|
||||
scenario_telemetries=telemetries,
|
||||
prom_cli=self.mock_prom,
|
||||
scenario_type="fallback_type",
|
||||
batch_start_dt=start,
|
||||
batch_end_dt=end,
|
||||
)
|
||||
|
||||
# evaluate_slos should have been called with the batch times
|
||||
call_kwargs = mock_eval_slos.call_args[1]
|
||||
self.assertEqual(call_kwargs["start_time"], start)
|
||||
self.assertEqual(call_kwargs["end_time"], end)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
@patch('krkn.resiliency.resiliency.calculate_resiliency_score')
|
||||
def test_add_scenario_reports_uses_scenario_name_from_telemetry(self, mock_calc_score, mock_eval_slos):
|
||||
"""Test that scenario name is taken from telemetry, not the fallback type."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
mock_calc_score.return_value = (100, {"passed": 1, "failed": 0, "total_points": 3, "points_lost": 0})
|
||||
|
||||
telemetries = [{"scenario": "real_scenario_name"}]
|
||||
|
||||
self.res.add_scenario_reports(
|
||||
scenario_telemetries=telemetries,
|
||||
prom_cli=self.mock_prom,
|
||||
scenario_type="fallback_type",
|
||||
batch_start_dt=datetime.datetime(2025, 1, 1),
|
||||
batch_end_dt=datetime.datetime(2025, 1, 2),
|
||||
)
|
||||
|
||||
self.assertEqual(self.res.scenario_reports[0]["name"], "real_scenario_name")
|
||||
|
||||
|
||||
class TestFinalizeAndSave(unittest.TestCase):
|
||||
"""Test cases for finalize_and_save method."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures with a pre-populated scenario report."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
|
||||
import yaml
|
||||
yaml.dump([
|
||||
{"expr": "up == 0", "severity": "critical", "description": "slo1"}
|
||||
], f)
|
||||
self.temp_file = f.name
|
||||
|
||||
self.res = Resiliency(alerts_yaml_path=self.temp_file)
|
||||
self.mock_prom = Mock()
|
||||
self.start = datetime.datetime(2025, 1, 1, 0, 0, 0)
|
||||
self.end = datetime.datetime(2025, 1, 1, 2, 0, 0)
|
||||
|
||||
# Pre-populate a scenario report so finalize_report doesn't raise
|
||||
self.res.scenario_reports = [
|
||||
{
|
||||
"name": "test_scenario",
|
||||
"window": {"start": self.start.isoformat(), "end": self.end.isoformat()},
|
||||
"score": 90,
|
||||
"weight": 1,
|
||||
"breakdown": {"total_points": 3, "points_lost": 0, "passed": 1, "failed": 0},
|
||||
"slo_results": {"slo1": True},
|
||||
"health_check_results": {},
|
||||
}
|
||||
]
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up temp files."""
|
||||
if os.path.exists(self.temp_file):
|
||||
os.unlink(self.temp_file)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
def test_finalize_and_save_standalone_writes_detailed_file(self, mock_eval_slos):
|
||||
"""Test that standalone mode writes a detailed JSON report to the given path."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
detailed_path = os.path.join(tmpdir, "resiliency-report.json")
|
||||
|
||||
self.res.finalize_and_save(
|
||||
prom_cli=self.mock_prom,
|
||||
total_start_time=self.start,
|
||||
total_end_time=self.end,
|
||||
run_mode="standalone",
|
||||
detailed_path=detailed_path,
|
||||
)
|
||||
|
||||
self.assertTrue(os.path.exists(detailed_path))
|
||||
with open(detailed_path) as fp:
|
||||
report = json.load(fp)
|
||||
self.assertIn("scenarios", report)
|
||||
|
||||
@patch('builtins.print')
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
def test_finalize_and_save_controller_mode_prints_to_stdout(self, mock_eval_slos, mock_print):
|
||||
"""Test that controller mode prints the detailed report to stdout with the expected prefix."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
|
||||
self.res.finalize_and_save(
|
||||
prom_cli=self.mock_prom,
|
||||
total_start_time=self.start,
|
||||
total_end_time=self.end,
|
||||
run_mode="detailed",
|
||||
)
|
||||
|
||||
mock_print.assert_called()
|
||||
call_args = str(mock_print.call_args)
|
||||
self.assertIn("KRKN_RESILIENCY_REPORT_JSON", call_args)
|
||||
|
||||
@patch('krkn.resiliency.resiliency.evaluate_slos')
|
||||
def test_finalize_and_save_populates_summary_after_call(self, mock_eval_slos):
|
||||
"""Test that finalize_and_save populates summary so get_summary works afterward."""
|
||||
mock_eval_slos.return_value = {"slo1": True}
|
||||
|
||||
self.res.finalize_and_save(
|
||||
prom_cli=self.mock_prom,
|
||||
total_start_time=self.start,
|
||||
total_end_time=self.end,
|
||||
)
|
||||
|
||||
summary = self.res.get_summary()
|
||||
self.assertIsNotNone(summary)
|
||||
self.assertIn("resiliency_score", summary)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
409
tests/test_resiliency_score.py
Normal file
409
tests/test_resiliency_score.py
Normal file
@@ -0,0 +1,409 @@
|
||||
"""
|
||||
Tests for krkn.resiliency.score module.
|
||||
|
||||
How to run these tests:
|
||||
|
||||
# Run all tests in this file
|
||||
python -m unittest tests.test_resiliency_score
|
||||
|
||||
# Run all tests with verbose output
|
||||
python -m unittest tests.test_resiliency_score -v
|
||||
|
||||
# Run a specific test class
|
||||
python -m unittest tests.test_resiliency_score.TestSLOResult
|
||||
python -m unittest tests.test_resiliency_score.TestCalculateResiliencyScore
|
||||
|
||||
# Run a specific test method
|
||||
python -m unittest tests.test_resiliency_score.TestSLOResult.test_slo_result_initialization
|
||||
python -m unittest tests.test_resiliency_score.TestCalculateResiliencyScore.test_all_slos_passing_returns_100
|
||||
|
||||
# Run with coverage
|
||||
python -m coverage run -m unittest tests.test_resiliency_score
|
||||
python -m coverage report -m
|
||||
"""
|
||||
|
||||
import unittest
|
||||
|
||||
from krkn.resiliency.score import (
|
||||
SLOResult,
|
||||
calculate_resiliency_score,
|
||||
DEFAULT_WEIGHTS,
|
||||
)
|
||||
|
||||
|
||||
class TestSLOResult(unittest.TestCase):
|
||||
"""Test cases for the SLOResult class."""
|
||||
|
||||
def test_slo_result_initialization(self):
|
||||
"""Test SLOResult object initialization."""
|
||||
slo = SLOResult(name="test_slo", severity="critical", passed=True)
|
||||
self.assertEqual(slo.name, "test_slo")
|
||||
self.assertEqual(slo.severity, "critical")
|
||||
self.assertTrue(slo.passed)
|
||||
|
||||
def test_slo_result_weight_critical_default(self):
|
||||
"""Test weight calculation for critical SLO with default weights."""
|
||||
slo = SLOResult(name="test_slo", severity="critical", passed=True)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), DEFAULT_WEIGHTS["critical"])
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 3)
|
||||
|
||||
def test_slo_result_weight_warning_default(self):
|
||||
"""Test weight calculation for warning SLO with default weights."""
|
||||
slo = SLOResult(name="test_slo", severity="warning", passed=True)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), DEFAULT_WEIGHTS["warning"])
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 1)
|
||||
|
||||
def test_slo_result_weight_custom_severity_weights(self):
|
||||
"""Test weight calculation with custom severity-level weights."""
|
||||
custom_weights = {"critical": 5, "warning": 2}
|
||||
slo_critical = SLOResult(name="test1", severity="critical", passed=True)
|
||||
slo_warning = SLOResult(name="test2", severity="warning", passed=True)
|
||||
|
||||
self.assertEqual(slo_critical.weight(custom_weights), 5)
|
||||
self.assertEqual(slo_warning.weight(custom_weights), 2)
|
||||
|
||||
def test_slo_result_weight_unknown_severity_falls_back_to_warning(self):
|
||||
"""Test that unknown severity falls back to warning weight."""
|
||||
slo = SLOResult(name="test_slo", severity="unknown", passed=True)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), DEFAULT_WEIGHTS["warning"])
|
||||
|
||||
def test_slo_result_custom_weight_overrides_severity(self):
|
||||
"""Test that a per-SLO custom weight overrides the severity-based weight."""
|
||||
slo = SLOResult(name="test_slo", severity="critical", passed=True, weight=10)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 10)
|
||||
|
||||
def test_slo_result_custom_weight_zero_is_valid(self):
|
||||
"""Test that a per-SLO weight of 0 is respected."""
|
||||
slo = SLOResult(name="test_slo", severity="critical", passed=False, weight=0)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 0)
|
||||
|
||||
def test_slo_result_explicit_none_weight_falls_back_to_severity(self):
|
||||
"""Test that weight=None explicitly falls back to severity-based weight, not 0."""
|
||||
slo = SLOResult(name="test_slo", severity="critical", passed=True, weight=None)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), DEFAULT_WEIGHTS["critical"])
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 3)
|
||||
|
||||
def test_slo_result_float_custom_weight(self):
|
||||
"""Test that a fractional custom weight (e.g. 0.5 as documented) is returned as-is."""
|
||||
slo = SLOResult(name="test_slo", severity="warning", passed=True, weight=0.5)
|
||||
self.assertEqual(slo.weight(DEFAULT_WEIGHTS), 0.5)
|
||||
|
||||
|
||||
class TestCalculateResiliencyScore(unittest.TestCase):
|
||||
"""Test cases for the calculate_resiliency_score function."""
|
||||
|
||||
def test_all_slos_passing_returns_100(self):
|
||||
"""Test that all passing SLOs returns score of 100."""
|
||||
slo_definitions = {
|
||||
"slo1": "critical",
|
||||
"slo2": "warning",
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": True,
|
||||
"slo2": True,
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
self.assertEqual(score, 100)
|
||||
self.assertEqual(breakdown["passed"], 2)
|
||||
self.assertEqual(breakdown["failed"], 0)
|
||||
self.assertEqual(breakdown["points_lost"], 0)
|
||||
|
||||
def test_all_slos_failing_returns_0(self):
|
||||
"""Test that all failing SLOs returns score of 0."""
|
||||
slo_definitions = {
|
||||
"slo1": "critical",
|
||||
"slo2": "warning",
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": False,
|
||||
"slo2": False,
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
self.assertEqual(score, 0)
|
||||
self.assertEqual(breakdown["passed"], 0)
|
||||
self.assertEqual(breakdown["failed"], 2)
|
||||
|
||||
def test_mixed_results_calculates_correct_score(self):
|
||||
"""Test score calculation with mixed pass/fail results."""
|
||||
slo_definitions = {
|
||||
"slo_critical": "critical", # weight=3
|
||||
"slo_warning": "warning", # weight=1
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo_critical": True, # 3 points
|
||||
"slo_warning": False, # 0 points (lost 1)
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# Total: 4 points, Lost: 1 point
|
||||
# Score: (4-1)/4 * 100 = 75%
|
||||
self.assertEqual(score, 75)
|
||||
self.assertEqual(breakdown["total_points"], 4)
|
||||
self.assertEqual(breakdown["points_lost"], 1)
|
||||
self.assertEqual(breakdown["passed"], 1)
|
||||
self.assertEqual(breakdown["failed"], 1)
|
||||
|
||||
def test_slo_not_in_prometheus_results_is_excluded(self):
|
||||
"""Test that SLOs not in prometheus_results are excluded from calculation."""
|
||||
slo_definitions = {
|
||||
"slo1": "critical",
|
||||
"slo2": "warning",
|
||||
"slo3": "critical", # Not in prometheus_results
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": True,
|
||||
"slo2": True,
|
||||
# slo3 is missing (no data)
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# Only slo1 and slo2 should be counted
|
||||
self.assertEqual(score, 100)
|
||||
self.assertEqual(breakdown["passed"], 2)
|
||||
self.assertEqual(breakdown["failed"], 0)
|
||||
|
||||
def test_health_checks_are_treated_as_critical(self):
|
||||
"""Test that health checks are always weighted as critical."""
|
||||
slo_definitions = {}
|
||||
prometheus_results = {}
|
||||
health_check_results = {
|
||||
"http://service1": True,
|
||||
"http://service2": False,
|
||||
}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# 2 health checks, each critical (weight=3)
|
||||
# Total: 6 points, Lost: 3 points (one failed)
|
||||
# Score: (6-3)/6 * 100 = 50%
|
||||
self.assertEqual(score, 50)
|
||||
self.assertEqual(breakdown["total_points"], 6)
|
||||
self.assertEqual(breakdown["points_lost"], 3)
|
||||
|
||||
def test_combined_slos_and_health_checks(self):
|
||||
"""Test calculation with both SLOs and health checks."""
|
||||
slo_definitions = {
|
||||
"slo1": "warning", # weight=1
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": True,
|
||||
}
|
||||
health_check_results = {
|
||||
"health1": True, # weight=3 (critical)
|
||||
"health2": False, # weight=3 (critical)
|
||||
}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# Total: 1 + 3 + 3 = 7 points
|
||||
# Lost: 3 points (health2 failed)
|
||||
# Score: (7-3)/7 * 100 = 57.14... = 57%
|
||||
self.assertEqual(score, 57)
|
||||
self.assertEqual(breakdown["total_points"], 7)
|
||||
self.assertEqual(breakdown["points_lost"], 3)
|
||||
self.assertEqual(breakdown["passed"], 2)
|
||||
self.assertEqual(breakdown["failed"], 1)
|
||||
|
||||
def test_per_slo_custom_weight_overrides_severity(self):
|
||||
"""Test that per-SLO custom weight in extended format overrides default severity weight."""
|
||||
slo_definitions = {
|
||||
"slo1": {"severity": "critical", "weight": 10},
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": False,
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
self.assertEqual(breakdown["total_points"], 10)
|
||||
self.assertEqual(breakdown["points_lost"], 10)
|
||||
self.assertEqual(score, 0)
|
||||
|
||||
def test_extended_format_mixed_with_legacy_format(self):
|
||||
"""Test that extended dict format and legacy string format can be mixed."""
|
||||
slo_definitions = {
|
||||
"slo_custom": {"severity": "warning", "weight": 5}, # custom weight
|
||||
"slo_legacy": "critical", # legacy, weight=3
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo_custom": False, # loses 5 pts
|
||||
"slo_legacy": True, # keeps 3 pts
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# Total: 5 + 3 = 8, Lost: 5
|
||||
# Score: (8-5)/8 * 100 = 37.5 -> 37
|
||||
self.assertEqual(breakdown["total_points"], 8)
|
||||
self.assertEqual(breakdown["points_lost"], 5)
|
||||
self.assertEqual(score, 37)
|
||||
|
||||
def test_extended_format_weight_none_falls_back_to_severity(self):
|
||||
"""Test that weight=None in extended dict format falls back to severity-based weight."""
|
||||
slo_definitions = {
|
||||
"slo1": {"severity": "critical", "weight": None}, # should use default critical weight=3
|
||||
}
|
||||
prometheus_results = {"slo1": False}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# weight falls back to critical=3
|
||||
self.assertEqual(breakdown["total_points"], 3)
|
||||
self.assertEqual(breakdown["points_lost"], 3)
|
||||
self.assertEqual(score, 0)
|
||||
|
||||
def test_float_custom_weight_scoring(self):
|
||||
"""Test scoring with fractional weights as documented (e.g. weight: 0.5)."""
|
||||
slo_definitions = {
|
||||
"slo_high": {"severity": "critical", "weight": 10},
|
||||
"slo_low": {"severity": "warning", "weight": 0.5},
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo_high": True, # keeps 10 pts
|
||||
"slo_low": False, # loses 0.5 pts
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# Total: 10.5, Lost: 0.5 -> (10/10.5)*100 = 95.23... -> 95
|
||||
self.assertEqual(breakdown["total_points"], 10.5)
|
||||
self.assertEqual(breakdown["points_lost"], 0.5)
|
||||
self.assertEqual(score, 95)
|
||||
|
||||
def test_failed_slo_with_zero_weight_does_not_affect_score(self):
|
||||
"""Test that a failing SLO with weight=0 contributes nothing to points_lost."""
|
||||
slo_definitions = {
|
||||
"slo_zero": {"severity": "critical", "weight": 0},
|
||||
"slo_normal": "warning", # weight=1
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo_zero": False, # fails but contributes 0 pts
|
||||
"slo_normal": True,
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
self.assertEqual(breakdown["total_points"], 1)
|
||||
self.assertEqual(breakdown["points_lost"], 0)
|
||||
self.assertEqual(score, 100)
|
||||
|
||||
def test_all_custom_weight_slos_passing_returns_100(self):
|
||||
"""Test that all custom-weight SLOs passing returns 100 regardless of weight values."""
|
||||
slo_definitions = {
|
||||
"slo1": {"severity": "critical", "weight": 20},
|
||||
"slo2": {"severity": "warning", "weight": 5},
|
||||
"slo3": {"severity": "critical", "weight": 0.5},
|
||||
}
|
||||
prometheus_results = {"slo1": True, "slo2": True, "slo3": True}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
self.assertEqual(score, 100)
|
||||
self.assertEqual(breakdown["points_lost"], 0)
|
||||
self.assertEqual(breakdown["passed"], 3)
|
||||
self.assertEqual(breakdown["failed"], 0)
|
||||
|
||||
def test_empty_slo_definitions_returns_zero_score(self):
|
||||
"""Test that empty SLO definitions returns score of 0."""
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions={},
|
||||
prometheus_results={},
|
||||
health_check_results={}
|
||||
)
|
||||
|
||||
self.assertEqual(score, 0)
|
||||
self.assertEqual(breakdown["total_points"], 0)
|
||||
self.assertEqual(breakdown["points_lost"], 0)
|
||||
self.assertEqual(breakdown["passed"], 0)
|
||||
self.assertEqual(breakdown["failed"], 0)
|
||||
|
||||
def test_prometheus_results_coerced_to_bool(self):
|
||||
"""Test that prometheus results are properly coerced to boolean."""
|
||||
slo_definitions = {
|
||||
"slo1": "warning",
|
||||
"slo2": "warning",
|
||||
"slo3": "warning",
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": 1, # Truthy
|
||||
"slo2": 0, # Falsy
|
||||
"slo3": None, # Falsy
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# slo1 passes (1 point), slo2 and slo3 fail (0 points each)
|
||||
# Total: 3 points, Lost: 2 points
|
||||
# Score: (3-2)/3 * 100 = 33.33... = 33%
|
||||
self.assertEqual(score, 33)
|
||||
self.assertEqual(breakdown["passed"], 1)
|
||||
self.assertEqual(breakdown["failed"], 2)
|
||||
|
||||
def test_score_calculation_rounds_down(self):
|
||||
"""Test that score calculation rounds down to integer."""
|
||||
slo_definitions = {
|
||||
"slo1": "critical", # 3 points
|
||||
"slo2": "critical", # 3 points
|
||||
"slo3": "critical", # 3 points
|
||||
}
|
||||
prometheus_results = {
|
||||
"slo1": True, # 3 points
|
||||
"slo2": True, # 3 points
|
||||
"slo3": False, # 0 points (lost 3)
|
||||
}
|
||||
health_check_results = {}
|
||||
|
||||
score, breakdown = calculate_resiliency_score(
|
||||
slo_definitions, prometheus_results, health_check_results
|
||||
)
|
||||
|
||||
# Total: 9 points, Lost: 3 points
|
||||
# Score: (9-3)/9 * 100 = 66.666... -> 66
|
||||
self.assertEqual(score, 66)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user