Compare commits

...

19 Commits

Author SHA1 Message Date
Paige Patton
0d78139fb6 increasing krkn lib version (#906)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-09-08 09:05:53 -04:00
Paige Patton
a3baffe8ee adding vm name option (#904)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m5s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-09-05 12:43:49 -04:00
Tullio Sebastiani
438b08fcd5 [CNCF Incubation] SBOM generation (#900)
fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-09-05 12:43:37 -04:00
Tullio Sebastiani
9b930a02a5 Implemented the new pod monitoring api on kill pod and kill container scenario (#896)
* implemented the new pod monitoring api

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* minor refactoring

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* krkn-lib 5.1.5 update

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-09-05 12:42:57 -04:00
Tullio Sebastiani
194e3b87ee fixed test_pod_network_filter flaky test (#905)
syntax



syntax



fix



fix



fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-09-05 11:59:30 -04:00
Paige Patton
8c05e44c23 adding ssh install and virtctl version
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m59s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-09-04 13:57:34 -07:00
Paige Patton
88f8cf49f1 fixing kubevirt name not duplicate namespace
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-09-04 12:45:05 -07:00
Paige Patton
015ba4d90d adding privileged option (#901)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 10m9s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
2025-09-03 11:14:57 -04:00
Tullio Sebastiani
26fdbef144 [CNCF Incubation] RELEASE.md - release process description (#899)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m43s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* [CNCF Incubation] RELEASE.md - release process description

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

change

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

typo

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* added mantainers link

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* added mantainers members and owners duties

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-09-02 16:18:30 +02:00
Paige Patton
d77e6dc79c adding maintainers definitions (#898)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-09-02 15:52:45 +02:00
Paige Patton
2885645e77 adding return pod status object not ints (#897)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 8m40s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-08-29 09:40:17 -04:00
Paige Patton
84169e2d4e adding no scenario type (#869)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 5m32s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-08-29 08:55:06 -04:00
Sahil Shah
05bc404d32 Adding IPMI tool to dockerfile
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 10m56s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Sahil Shah <sahshah@redhat.com>
2025-08-25 12:28:03 -04:00
Paige Patton
e8fd432fc5 adding enable metrics for prometheus coverage (#871)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m31s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-08-21 17:48:58 +02:00
Tullio Sebastiani
ec05675e3a enabling elastic on main test suite (#892)
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-08-21 15:47:11 +02:00
Tullio Sebastiani
c91648d35c Fixing functional tests (#890)
* Fixes the service hijacking issue

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

test

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

fixes the rollback folder issue

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

fixes the test issue

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* added config options to the main config

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-08-21 15:09:52 +02:00
LIU ZHE YOU
24aa9036b0 [Rollback Scenarios] Fix cleanup_rollback_version_files error (#889)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m57s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* Replace ValueError with warning when directory count is not 1

* Add default config for rollback feature
2025-08-21 12:12:01 +02:00
LIU ZHE YOU
816363d151 [Rollback Scenarios] Perform rollback (#879)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m18s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* Add rollback config

* Inject rollback handler to scenario plugin

* Add Serializer

* Add decorator

* Add test with SimpleRollbackScenarioPlugin

* Add logger for verbose debug flow

* Resolve review comment

- remove additional rollback config in config.yaml
- set KUBECONFIG to ~/.kube/config in test_rollback

* Simplify set_rollback_context_decorator

* Fix integration of rollback_handler in __load_plugins

* Refactor rollback.config module

  - make it singleton class with register method to construct
  - RollbackContext ( <timestamp>-<run_uuid> )
  - add get_rollback_versions_directory for moduling the directory
    format

* Adapt new rollback.config

* Refactor serialization

- respect rollback_callable_name
- refactor _parse_rollback_callable_code
- refine VERSION_FILE_TEMPLATE

* Add get_scenario_rollback_versions_directory in RollbackConfig

* Add rollback in ApplicationOutageScenarioPlugin

* Add RollbackCallable and RollbackContent for type annotation

* Refactor rollback_handler with limited arguments

* Refactor the serialization for rollback

- limited arguments: callback and rollback_content just these two!
- always constuct lib_openshift and lib_telemetry in version file
- add _parse_rollback_content_definition for retrieving scenaio specific
  rollback_content
- remove utils for formating variadic function

* Refactor applicaton outage scenario

* Fix test_rollback

* Make RollbackContent with static fields

* simplify serialization

  - Remove all unused format dynamic arguments utils
  - Add jinja template for version file
  - Replace set_context for serialization with passing version to serialize_callable

* Add rollback for hogs scenario

* Fix version file full path based on feedback

- {versions_directory}/<timestamp(ns)>-<run_uuid>/{scenario_type}-<timestamp(ns)>-<random_hash>.py

* Fix scenario plugins after rebase

* Add rollback config

* Inject rollback handler to scenario plugin

* Add test with SimpleRollbackScenarioPlugin

* Resolve review comment

- remove additional rollback config in config.yaml
- set KUBECONFIG to ~/.kube/config in test_rollback

* Fix integration of rollback_handler in __load_plugins

* Refactor rollback.config module

  - make it singleton class with register method to construct
  - RollbackContext ( <timestamp>-<run_uuid> )
  - add get_rollback_versions_directory for moduling the directory
    format

* Adapt new rollback.config

* Add rollback in ApplicationOutageScenarioPlugin

* Add RollbackCallable and RollbackContent for type annotation

* Refactor applicaton outage scenario

* Fix test_rollback

* Make RollbackContent with static fields

* simplify serialization

  - Remove all unused format dynamic arguments utils
  - Add jinja template for version file
  - Replace set_context for serialization with passing version to serialize_callable

* Add rollback for hogs scenario

* Fix version file full path based on feedback

- {versions_directory}/<timestamp(ns)>-<run_uuid>/{scenario_type}-<timestamp(ns)>-<random_hash>.py

* Fix scenario plugins after rebase

* Add execute rollback

* Add CLI for list and execute rollback

* Replace subprocess with importlib

* Fix error after rebase

* fixup! Fix docstring

- Add telemetry_ocp in execute_rollback docstring
- Remove rollback_config in create_plugin docstring
- Remove scenario_types in set_rollback_callable docsting

* fixup! Replace os.urandom with krkn_lib.utils.get_random_string

* fixup! Add missing telemetry_ocp for execute_rollback_version_files

* fixup! Remove redundant import

- Remove duplicate TYPE_CHECKING in handler module
- Remove cast in signal module
- Remove RollbackConfig in scenario_plugin_factory

* fixup! Replace sys.exit(1) with return

* fixup! Remove duplicate rollback_network_policy

* fixup! Decouple Serializer initialization

* fixup! Rename callback to rollback_callable

* fixup! Refine comment for constructing AbstractScenarioPlugin with
placeholder value

* fixup! Add version in docstring

* fixup! Remove uv.lock
2025-08-20 16:50:52 +02:00
Paige Patton
90c52f907f regex to tools pod names (#886)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m46s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-08-15 11:13:42 -04:00
43 changed files with 1768 additions and 144 deletions

View File

@@ -16,6 +16,7 @@ jobs:
PREVIOUS_TAG=$(git tag --sort=-creatordate | sed -n '2 p')
echo $PREVIOUS_TAG
echo "PREVIOUS_TAG=$PREVIOUS_TAG" >> "$GITHUB_ENV"
- name: generate release notes from template
id: release-notes
env:
@@ -45,3 +46,15 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
gh release create ${{ github.ref_name }} --title "${{ github.ref_name }}" -F release-notes.md
- name: Install Syft
run: |
curl -sSfL https://raw.githubusercontent.com/anchore/syft/main/install.sh | sudo sh -s -- -b /usr/local/bin
- name: Generate SBOM
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
syft . --scope all-layers --output cyclonedx-json > sbom.json
echo "SBOM generated successfully!"
gh release upload ${{ github.ref_name }} sbom.json

View File

@@ -39,6 +39,8 @@ jobs:
run: |
es_pod_name=$(kubectl get pods -l "app.kubernetes.io/instance=elasticsearch" -o name)
kubectl --namespace default port-forward $es_pod_name 9200 &
prom_name=$(kubectl get pods -n monitoring -l "app.kubernetes.io/name=prometheus" -o name)
kubectl --namespace monitoring port-forward $prom_name 9090 &
kubectl apply -f CI/templates/outage_pod.yaml
kubectl wait --for=condition=ready pod -l scenario=outage --timeout=300s
kubectl apply -f CI/templates/container_scenario_pod.yaml
@@ -66,6 +68,7 @@ jobs:
yq -i '.elastic.elastic_port=9200' CI/config/common_test_config.yaml
yq -i '.elastic.elastic_url="https://localhost"' CI/config/common_test_config.yaml
yq -i '.elastic.enable_elastic=True' CI/config/common_test_config.yaml
yq -i '.performance_monitoring.prometheus_url="http://localhost:9090"' CI/config/common_test_config.yaml
echo "test_service_hijacking" > ./CI/tests/functional_tests
echo "test_app_outages" >> ./CI/tests/functional_tests
echo "test_container" >> ./CI/tests/functional_tests
@@ -94,8 +97,10 @@ jobs:
yq -i '.kraken.port="8081"' CI/config/common_test_config.yaml
yq -i '.kraken.signal_address="0.0.0.0"' CI/config/common_test_config.yaml
yq -i '.kraken.performance_monitoring="localhost:9090"' CI/config/common_test_config.yaml
yq -i '.elastic.enable_elastic=True' CI/config/common_test_config.yaml
yq -i '.elastic.elastic_port=9200' CI/config/common_test_config.yaml
yq -i '.elastic.elastic_url="https://localhost"' CI/config/common_test_config.yaml
yq -i '.performance_monitoring.prometheus_url="http://localhost:9090"' CI/config/common_test_config.yaml
yq -i '.telemetry.username="${{secrets.TELEMETRY_USERNAME}}"' CI/config/common_test_config.yaml
yq -i '.telemetry.password="${{secrets.TELEMETRY_PASSWORD}}"' CI/config/common_test_config.yaml
echo "test_telemetry" > ./CI/tests/functional_tests
@@ -144,12 +149,14 @@ jobs:
path: htmlcov
if-no-files-found: error
- name: Upload json coverage
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v4
with:
name: coverage.json
path: coverage.json
if-no-files-found: error
- name: Check CI results
if: ${{ success() || failure() }}
run: "! grep Fail CI/results.markdown"
badge:

View File

@@ -2,6 +2,8 @@ kraken:
distribution: kubernetes # Distribution can be kubernetes or openshift.
kubeconfig_path: ~/.kube/config # Path to kubeconfig.
exit_on_failure: False # Exit when a post action scenario fails.
auto_rollback: True # Enable auto rollback for scenarios.
rollback_versions_directory: /tmp/kraken-rollback # Directory to store rollback version files.
chaos_scenarios: # List of policies/chaos scenarios to load.
- $scenario_type: # List of chaos pod scenarios to load.
- $scenario_file
@@ -15,8 +17,11 @@ performance_monitoring:
prometheus_url: # The prometheus url/route is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes.
prometheus_bearer_token: # The bearer token is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes. This is needed to authenticate with prometheus.
uuid: # uuid for the run is generated by default if not set.
enable_alerts: False # Runs the queries specified in the alert profile and displays the info or exits 1 when severity=error.
alert_profile: config/alerts.yaml # Path to alert profile with the prometheus queries.
enable_alerts: True # Runs the queries specified in the alert profile and displays the info or exits 1 when severity=error
enable_metrics: True
alert_profile: config/alerts.yaml # Path or URL to alert profile with the prometheus queries
metrics_profile: config/metrics-report.yaml
check_critical_alerts: True # Path to alert profile with the prometheus queries.
tunings:
wait_duration: 6 # Duration to wait between each chaos scenario.

View File

@@ -18,6 +18,7 @@ function functional_test_app_outage {
kubectl get pods
envsubst < CI/config/common_test_config.yaml > CI/config/app_outage.yaml
cat $scenario_file
python3 -m coverage run -a run_kraken.py -c CI/config/app_outage.yaml
echo "App outage scenario test: Success"
}

View File

@@ -11,7 +11,7 @@ function functional_pod_network_filter {
yq -i '.[0].target="pod-network-filter-test"' scenarios/kube/pod-network-filter.yml
yq -i '.[0].protocols=["tcp"]' scenarios/kube/pod-network-filter.yml
yq -i '.[0].ports=[443]' scenarios/kube/pod-network-filter.yml
yq -i '.performance_monitoring.check_critical_alerts=False' CI/config/pod_network_filter.yaml
## Test webservice deployment
kubectl apply -f ./CI/templates/pod_network_filter.yaml
@@ -29,7 +29,9 @@ function functional_pod_network_filter {
[ $COUNTER -eq "100" ] && echo "maximum number of retry reached, test failed" && exit 1
done
python3 -m coverage run -a run_kraken.py -c CI/config/pod_network_filter.yaml > /dev/null 2>&1 &
cat scenarios/kube/pod-network-filter.yml
python3 -m coverage run -a run_kraken.py -c CI/config/pod_network_filter.yaml > krkn_pod_network.out 2>&1 &
PID=$!
# wait until the dns resolution starts failing and the service returns 400
@@ -53,6 +55,7 @@ function functional_pod_network_filter {
done
wait $PID
}
functional_pod_network_filter

View File

@@ -39,7 +39,7 @@ function functional_test_service_hijacking {
export scenario_file="scenarios/kube/service_hijacking.yaml"
export post_config=""
envsubst < CI/config/common_test_config.yaml > CI/config/service_hijacking.yaml
python3 -m coverage run -a run_kraken.py -c CI/config/service_hijacking.yaml > /dev/null 2>&1 &
python3 -m coverage run -a run_kraken.py -c CI/config/service_hijacking.yaml > /tmp/krkn.log 2>&1 &
PID=$!
#Waiting the hijacking to have effect
COUNTER=0
@@ -100,8 +100,13 @@ function functional_test_service_hijacking {
[ "${PAYLOAD_PATCH_2//[$'\t\r\n ']}" == "${OUT_PATCH//[$'\t\r\n ']}" ] && echo "Step 2 PATCH Payload OK" || (echo "Step 2 PATCH Payload did not match. Test failed." && exit 1)
[ "$OUT_STATUS_CODE" == "$STATUS_CODE_PATCH_2" ] && echo "Step 2 PATCH Status Code OK" || (echo "Step 2 PATCH status code did not match. Test failed." && exit 1)
[ "$OUT_CONTENT" == "$TEXT_MIME" ] && echo "Step 2 PATCH MIME OK" || (echo " Step 2 PATCH MIME did not match. Test failed." && exit 1)
wait $PID
cat /tmp/krkn.log
# now checking if service has been restore correctly and nginx responds correctly
curl -s $SERVICE_URL | grep nginx! && echo "BODY: Service restored!" || (echo "BODY: failed to restore service" && exit 1)
OUT_STATUS_CODE=`curl -X GET -s -o /dev/null -I -w "%{http_code}" $SERVICE_URL`

83
GOVERNANCE.md Normal file
View File

@@ -0,0 +1,83 @@
The governance model adopted here is heavily influenced by a set of CNCF projects, especially drew
reference from [Kubernetes governance](https://github.com/kubernetes/community/blob/master/governance.md).
*For similar structures some of the same wordings from kubernetes governance are borrowed to adhere
to the originally construed meaning.*
## Principles
- **Open**: Krkn is open source community.
- **Welcoming and respectful**: See [Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md).
- **Transparent and accessible**: Work and collaboration should be done in public.
Changes to the Krkn organization, Krkn code repositories, and CNCF related activities (e.g.
level, involvement, etc) are done in public.
- **Merit**: Ideas and contributions are accepted according to their technical merit
and alignment with project objectives, scope and design principles.
## Code of Conduct
Krkn follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md).
Here is an excerpt:
> As contributors and maintainers of this project, and in the interest of fostering an open and welcoming community, we pledge to respect all people who contribute through reporting issues, posting feature requests, updating documentation, submitting pull requests or patches, and other activities.
## Maintainer Levels
### Contributor
Contributors contributor to the community. Anyone can become a contributor by participating in discussions, reporting bugs, or contributing code or documentation.
#### Responsibilities:
Be active in the community and adhere to the Code of Conduct.
Report bugs and suggest new features.
Contribute high-quality code and documentation.
### Member
Members are active contributors to the community. Members have demonstrated a strong understanding of the project's codebase and conventions.
#### Responsibilities:
Review pull requests for correctness, quality, and adherence to project standards.
Provide constructive and timely feedback to contributors.
Ensure that all contributions are well-tested and documented.
Work with maintainers to ensure a smooth and efficient release process.
### Maintainer
Maintainers are responsible for the overall health and direction of the project. They are long-standing contributors who have shown a deep commitment to the project's success.
#### Responsibilities:
Set the technical direction and vision for the project.
Manage releases and ensure the stability of the main branch.
Make decisions on feature inclusion and project priorities.
Mentor other contributors and help grow the community.
Resolve disputes and make final decisions when consensus cannot be reached.
### Owner
Owners have administrative access to the project and are the final decision-makers.
#### Responsibilities:
Manage the core team of maintainers and approvers.
Set the overall vision and strategy for the project.
Handle administrative tasks, such as managing the project's repository and other resources.
Represent the project in the broader open-source community.
# Credits
Sections of this documents have been borrowed from [Kubernetes governance](https://github.com/kubernetes/community/blob/master/governance.md)

View File

@@ -1,12 +1,34 @@
## Overview
This document contains a list of maintainers in this repo.
This file lists the maintainers and committers of the Krkn project.
In short, maintainers are people who are in charge of the maintenance of the Krkn project. Committers are active community members who have shown that they are committed to the continuous development of the project through ongoing engagement with the community.
For detailed description of the roles, see [Governance](./GOVERNANCE.md) page.
## Current Maintainers
| Maintainer | GitHub ID | Email |
|---------------------| --------------------------------------------------------- | ----------------------- |
| Ravi Elluri | [chaitanyaenr](https://github.com/chaitanyaenr) | nelluri@redhat.com |
| Pradeep Surisetty | [psuriset](https://github.com/psuriset) | psuriset@redhat.com |
| Paige Rubendall | [paigerube14](https://github.com/paigerube14) | prubenda@redhat.com |
| Tullio Sebastiani | [tsebastiani](https://github.com/tsebastiani) | tsebasti@redhat.com |
| Maintainer | GitHub ID | Email | Contribution Level |
|---------------------| --------------------------------------------------------- | ----------------------- | ---------------------- |
| Ravi Elluri | [chaitanyaenr](https://github.com/chaitanyaenr) | nelluri@redhat.com | Owner |
| Pradeep Surisetty | [psuriset](https://github.com/psuriset) | psuriset@redhat.com | Owner |
| Paige Patton | [paigerube14](https://github.com/paigerube14) | prubenda@redhat.com | Maintainer |
| Tullio Sebastiani | [tsebastiani](https://github.com/tsebastiani) | tsebasti@redhat.com | Maintainer |
| Yogananth Subramanian | [yogananth-subramanian](https://github.com/yogananth-subramanian) | ysubrama@redhat.com |Maintainer |
| Sahil Shah | [shahsahil264](https://github.com/shahsahil264) | sahshah@redhat.com | Member |
Note : It is mandatory for all Krkn community members to follow our [Code of Conduct](./CODE_OF_CONDUCT.md)
## Contributor Ladder
This project follows a contributor ladder model, where contributors can take on more responsibilities as they gain experience and demonstrate their commitment to the project.
The roles are:
* Contributor: A contributor to the community whether it be with code, docs or issues
* Member: A contributor who is active in the community and reviews pull requests.
* Maintainer: A contributor who is responsible for the overall health and direction of the project.
* Owner: A contributor who has administrative ownership of the project.

55
RELEASE.md Normal file
View File

@@ -0,0 +1,55 @@
### Release Protocol: The Community-First Cycle
This document outlines the project's release protocol, a methodology designed to ensure a responsive and transparent development process that is closely aligned with the needs of our users and contributors. This protocol is tailored for projects in their early stages, prioritizing agility and community feedback over a rigid, time-boxed schedule.
#### 1. Key Principles
* **Community as the Compass:** The primary driver for all development is feedback from our user and contributor community.
* **Prioritization by Impact:** Tasks are prioritized based on their impact on user experience, the urgency of bug fixes, and the value of community-contributed features.
* **Event-Driven Releases:** Releases are not bound by a fixed calendar. New versions are published when a significant body of work is complete, a critical issue is resolved, or a new feature is ready for adoption.
* **Transparency and Communication:** All development decisions, progress, and plans are communicated openly through our issue tracker, pull requests, and community channels.
#### 2. The Release Lifecycle
The release cycle is a continuous flow of activities rather than a series of sequential phases.
**2.1. Discovery & Prioritization**
* New features and bug fixes are identified through user feedback on our issue tracker, community discussions, and direct contributions.
* The core maintainers, in collaboration with the community, continuously evaluate and tag issues to create an open and dynamic backlog.
**2.2. Development & Code Review**
* Work is initiated based on the highest-priority items in the backlog.
* All code contributions are made via pull requests (PRs).
* PRs are reviewed by maintainers and other contributors to ensure code quality, adherence to project standards, and overall stability.
**2.3. Release Readiness**
A new release is considered ready when one of the following conditions is met:
* A major new feature has been completed and thoroughly tested.
* A critical security vulnerability or bug has been addressed.
* A sufficient number of smaller improvements and fixes have been merged, providing meaningful value to users.
**2.4. Versioning**
We adhere to [**Semantic Versioning 2.0.0**](https://semver.org/).
* **Major version (`X.y.z`)**: Reserved for releases that introduce breaking changes.
* **Minor version (`x.Y.z`)**: Used for new features or significant non-breaking changes.
* **Patch version (`x.y.Z`)**: Used for bug fixes and small, non-functional improvements.
#### 3. Roles and Responsibilities
* **Members:** The [core team](https://github.com/krkn-chaos/krkn/blob/main/MAINTAINERS.md) responsible for the project's health. Their duties include:
* Reviewing pull requests.
* Contributing code and documentation via pull requests.
* Engaging in discussions and providing feedback.
* **Maintainers and Owners:** The [core team](https://github.com/krkn-chaos/krkn/blob/main/MAINTAINERS.md) responsible for the project's health. Their duties include:
* Facilitating community discussions and prioritization.
* Reviewing and merging pull requests.
* Cutting and announcing official releases.
* **Contributors:** The community. Their duties include:
* Reporting bugs and suggesting new features.
* Contributing code and documentation via pull requests.
* Engaging in discussions and providing feedback.
#### 4. Adoption and Future Evolution
This protocol is designed for the current stage of the project. As the project matures and the contributor base grows, the maintainers will evaluate the need for a more structured methodology to ensure continued scalability and stability.

View File

@@ -1,6 +1,8 @@
kraken:
kubeconfig_path: ~/.kube/config # Path to kubeconfig
exit_on_failure: False # Exit when a post action scenario fails
auto_rollback: True # Enable auto rollback for scenarios.
rollback_versions_directory: /tmp/kraken-rollback # Directory to store rollback version files.
publish_kraken_status: True # Can be accessed at http://0.0.0.0:8081
signal_state: RUN # Will wait for the RUN signal when set to PAUSE before running the scenarios, refer docs/signal.md for more details
signal_address: 0.0.0.0 # Signal listening address
@@ -117,3 +119,10 @@ health_checks: # Utilizing health c
bearer_token: # Bearer token for authentication if any
auth: # Provide authentication credentials (username , password) in tuple format if any, ex:("admin","secretpassword")
exit_on_failure: # If value is True exits when health check failed for application, values can be True/False
kubevirt_checks: # Utilizing virt check endpoints to observe ssh ability to VMI's during chaos injection.
interval: 2 # Interval in seconds to perform virt checks, default value is 2 seconds
namespace: # Namespace where to find VMI's
name: # Regex Name style of VMI's to watch, optional, will watch all VMI names in the namespace if left blank
only_failures: False # Boolean of whether to show all VMI's failures and successful ssh connection (False), or only failure status' (True)
disconnected: False # Boolean of how to try to connect to the VMIs; if True will use the ip_address to try ssh from within a node, if false will use the name and uses virtctl to try to connect; Default is False

View File

@@ -28,9 +28,14 @@ ENV KUBECONFIG /home/krkn/.kube/config
# This overwrites any existing configuration in /etc/yum.repos.d/kubernetes.repo
RUN dnf update && dnf install -y --setopt=install_weak_deps=False \
git python39 jq yq gettext wget which &&\
git python39 jq yq gettext wget which ipmitool openssh-server &&\
dnf clean all
# Virtctl
RUN export VERSION=$(curl https://storage.googleapis.com/kubevirt-prow/release/kubevirt/kubevirt/stable.txt) && \
wget https://github.com/kubevirt/kubevirt/releases/download/${VERSION}/virtctl-${VERSION}-linux-amd64 && \
chmod +x virtctl-${VERSION}-linux-amd64 && sudo mv virtctl-${VERSION}-linux-amd64 /usr/local/bin/virtctl
# copy oc client binary from oc-build image
COPY --from=oc-build /tmp/oc/oc /usr/bin/oc

View File

@@ -425,6 +425,55 @@
"default": "False",
"required": "false"
},
{
"name": "kubevirt-check-interval",
"short_description": "Kube Virt check interval",
"description": "How often to check the kube virt check Vms ssh status",
"variable": "KUBE_VIRT_CHECK_INTERVAL",
"type": "number",
"default": "2",
"required": "false"
},
{
"name": "kubevirt-namespace",
"short_description": "KubeVirt namespace to check",
"description": "KubeVirt namespace to check the health of",
"variable": "KUBE_VIRT_NAMESPACE",
"type": "string",
"default": "",
"required": "false"
},
{
"name": "kubevirt-name",
"short_description": "KubeVirt regex names to watch",
"description": "KubeVirt regex names to check VMs",
"variable": "KUBE_VIRT_NAME",
"type": "string",
"default": "",
"required": "false"
},
{
"name": "kubevirt-only-failures",
"short_description": "KubeVirt checks only report if failure occurs",
"description": "KubeVirt checks only report if failure occurs",
"variable": "KUBE_VIRT_FAILURES",
"type": "enum",
"allowed_values": "True,False,true,false",
"separator": ",",
"default": "False",
"required": "false"
},
{
"name": "kubevirt-disconnected",
"short_description": "KubeVirt checks in disconnected mode",
"description": "KubeVirt checks in disconnected mode, bypassing the clusters Api",
"variable": "KUBE_VIRT_DISCONNECTED",
"type": "enum",
"allowed_values": "True,False,true,false",
"separator": ",",
"default": "False",
"required": "false"
},
{
"name": "krkn-debug",
"short_description": "Krkn debug mode",

View File

@@ -18,9 +18,8 @@ def invoke(command, timeout=None):
def invoke_no_exit(command, timeout=None):
output = ""
try:
output = subprocess.check_output(command, shell=True, universal_newlines=True, timeout=timeout)
output = subprocess.check_output(command, shell=True, universal_newlines=True, timeout=timeout, stderr=subprocess.DEVNULL)
except Exception as e:
logging.error("Failed to run %s, error: %s" % (command, e))
return str(e)
return output

View File

121
krkn/rollback/command.py Normal file
View File

@@ -0,0 +1,121 @@
import os
import logging
from typing import Optional, TYPE_CHECKING
from krkn.rollback.config import RollbackConfig
from krkn.rollback.handler import execute_rollback_version_files, cleanup_rollback_version_files
if TYPE_CHECKING:
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
def list_rollback(run_uuid: Optional[str]=None, scenario_type: Optional[str]=None):
"""
List rollback version files in a tree-like format.
:param cfg: Configuration file path
:param run_uuid: Optional run UUID to filter by
:param scenario_type: Optional scenario type to filter by
:return: Exit code (0 for success, 1 for error)
"""
logging.info("Listing rollback version files")
versions_directory = RollbackConfig().versions_directory
logging.info(f"Rollback versions directory: {versions_directory}")
# Check if the directory exists first
if not os.path.exists(versions_directory):
logging.info(f"Rollback versions directory does not exist: {versions_directory}")
return 0
# List all directories and files
try:
# Get all run directories
run_dirs = []
for item in os.listdir(versions_directory):
item_path = os.path.join(versions_directory, item)
if os.path.isdir(item_path):
# Apply run_uuid filter if specified
if run_uuid is None or run_uuid in item:
run_dirs.append(item)
if not run_dirs:
if run_uuid:
logging.info(f"No rollback directories found for run_uuid: {run_uuid}")
else:
logging.info("No rollback directories found")
return 0
# Sort directories for consistent output
run_dirs.sort()
print(f"\n{versions_directory}/")
for i, run_dir in enumerate(run_dirs):
is_last_dir = (i == len(run_dirs) - 1)
dir_prefix = "└── " if is_last_dir else "├── "
print(f"{dir_prefix}{run_dir}/")
# List files in this directory
run_dir_path = os.path.join(versions_directory, run_dir)
try:
files = []
for file in os.listdir(run_dir_path):
file_path = os.path.join(run_dir_path, file)
if os.path.isfile(file_path):
# Apply scenario_type filter if specified
if scenario_type is None or file.startswith(scenario_type):
files.append(file)
files.sort()
for j, file in enumerate(files):
is_last_file = (j == len(files) - 1)
file_prefix = " └── " if is_last_dir else "│ └── " if is_last_file else ("│ ├── " if not is_last_dir else " ├── ")
print(f"{file_prefix}{file}")
except PermissionError:
file_prefix = " └── " if is_last_dir else "│ └── "
print(f"{file_prefix}[Permission Denied]")
except Exception as e:
logging.error(f"Error listing rollback directory: {e}")
return 1
return 0
def execute_rollback(telemetry_ocp: "KrknTelemetryOpenshift", run_uuid: Optional[str]=None, scenario_type: Optional[str]=None):
"""
Execute rollback version files and cleanup if successful.
:param telemetry_ocp: Instance of KrknTelemetryOpenshift
:param run_uuid: Optional run UUID to filter by
:param scenario_type: Optional scenario type to filter by
:return: Exit code (0 for success, 1 for error)
"""
logging.info("Executing rollback version files")
if not run_uuid:
logging.error("run_uuid is required for execute-rollback command")
return 1
if not scenario_type:
logging.warning("scenario_type is not specified, executing all scenarios in rollback directory")
try:
# Execute rollback version files
logging.info(f"Executing rollback for run_uuid={run_uuid}, scenario_type={scenario_type or '*'}")
execute_rollback_version_files(telemetry_ocp, run_uuid, scenario_type)
# If execution was successful, cleanup the version files
logging.info("Rollback execution completed successfully, cleaning up version files")
cleanup_rollback_version_files(run_uuid, scenario_type)
logging.info("Rollback execution and cleanup completed successfully")
return 0
except Exception as e:
logging.error(f"Error during rollback execution: {e}")
return 1

189
krkn/rollback/config.py Normal file
View File

@@ -0,0 +1,189 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, TYPE_CHECKING, Optional
from typing_extensions import TypeAlias
import time
import os
import logging
from krkn_lib.utils import get_random_string
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
RollbackCallable: TypeAlias = Callable[
["RollbackContent", "KrknTelemetryOpenshift"], None
]
if TYPE_CHECKING:
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
RollbackCallable: TypeAlias = Callable[
["RollbackContent", "KrknTelemetryOpenshift"], None
]
class SingletonMeta(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
@dataclass(frozen=True)
class RollbackContent:
"""
RollbackContent is a dataclass that defines the necessary fields for rollback operations.
"""
resource_identifier: str
namespace: Optional[str] = None
def __str__(self):
namespace = f'"{self.namespace}"' if self.namespace else "None"
resource_identifier = f'"{self.resource_identifier}"'
return f"RollbackContent(namespace={namespace}, resource_identifier={resource_identifier})"
class RollbackContext(str):
"""
RollbackContext is a string formatted as '<timestamp (s) >-<run_uuid>'.
It represents the context for rollback operations, uniquely identifying a run.
"""
def __new__(cls, run_uuid: str):
return super().__new__(cls, f"{time.time_ns()}-{run_uuid}")
class RollbackConfig(metaclass=SingletonMeta):
"""Configuration for the rollback scenarios."""
def __init__(self):
self._auto = False
self._versions_directory = ""
self._registered = False
@property
def auto(self):
return self._auto
@auto.setter
def auto(self, value):
if self._registered:
raise AttributeError("Can't modify 'auto' after registration")
self._auto = value
@property
def versions_directory(self):
return self._versions_directory
@versions_directory.setter
def versions_directory(self, value):
if self._registered:
raise AttributeError("Can't modify 'versions_directory' after registration")
self._versions_directory = value
@classmethod
def register(cls, auto=False, versions_directory=""):
"""Initialize and return the singleton instance with given configuration."""
instance = cls()
instance.auto = auto
instance.versions_directory = versions_directory
instance._registered = True
return instance
@classmethod
def get_rollback_versions_directory(cls, rollback_context: RollbackContext) -> str:
"""
Get the rollback context directory for a given rollback context.
:param rollback_context: The rollback context string.
:return: The path to the rollback context directory.
"""
return f"{cls().versions_directory}/{rollback_context}"
@classmethod
def search_rollback_version_files(cls, run_uuid: str, scenario_type: str | None = None) -> list[str]:
"""
Search for rollback version files based on run_uuid and scenario_type.
1. Search directories with "run_uuid" in name under "cls.versions_directory".
2. Search files in those directories that start with "scenario_type" in matched directories in step 1.
:param run_uuid: Unique identifier for the run.
:param scenario_type: Type of the scenario.
:return: List of version file paths.
"""
if not os.path.exists(cls().versions_directory):
return []
rollback_context_directories = [
dirname for dirname in os.listdir(cls().versions_directory) if run_uuid in dirname
]
if not rollback_context_directories:
logger.warning(f"No rollback context directories found for run UUID {run_uuid}")
return []
if len(rollback_context_directories) > 1:
logger.warning(
f"Expected one directory for run UUID {run_uuid}, found: {rollback_context_directories}"
)
rollback_context_directory = rollback_context_directories[0]
version_files = []
scenario_rollback_versions_directory = os.path.join(
cls().versions_directory, rollback_context_directory
)
for file in os.listdir(scenario_rollback_versions_directory):
# assert all files start with scenario_type and end with .py
if file.endswith(".py") and (scenario_type is None or file.startswith(scenario_type)):
version_files.append(
os.path.join(scenario_rollback_versions_directory, file)
)
else:
logger.warning(
f"File {file} does not match expected pattern for scenario type {scenario_type}"
)
return version_files
@dataclass(frozen=True)
class Version:
scenario_type: str
rollback_context: RollbackContext
timestamp: int = time.time_ns() # Get current timestamp in nanoseconds
hash_suffix: str = get_random_string(8) # Generate a random string of 8 characters
@property
def version_file_name(self) -> str:
"""
Generate a version file name based on the timestamp and hash suffix.
:return: The generated version file name.
"""
return f"{self.scenario_type}_{self.timestamp}_{self.hash_suffix}.py"
@property
def version_file_full_path(self) -> str:
"""
Get the full path for the version file based on the version object and current context.
:return: The generated version file full path.
"""
return f"{RollbackConfig.get_rollback_versions_directory(self.rollback_context)}/{self.version_file_name}"
@staticmethod
def new_version(scenario_type: str, rollback_context: RollbackContext) -> "Version":
"""
Get the current version of the rollback configuration.
:return: An instance of Version with the current timestamp and hash suffix.
"""
return Version(
scenario_type=scenario_type,
rollback_context=rollback_context,
)

238
krkn/rollback/handler.py Normal file
View File

@@ -0,0 +1,238 @@
from __future__ import annotations
import logging
from typing import cast, TYPE_CHECKING
import os
import importlib.util
import inspect
from krkn.rollback.config import RollbackConfig, RollbackContext, Version
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn.rollback.config import RollbackContent, RollbackCallable
from krkn.rollback.serialization import Serializer
def set_rollback_context_decorator(func):
"""
Decorator to automatically set and clear rollback context.
It extracts run_uuid from the function arguments and sets the context in rollback_handler
before executing the function, and clears it after execution.
Usage:
.. code-block:: python
from krkn.rollback.handler import set_rollback_context_decorator
# for any scenario plugin that inherits from AbstractScenarioPlugin
@set_rollback_context_decorator
def run(
self,
run_uuid: str,
scenario: str,
krkn_config: dict[str, any],
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
):
# Your scenario logic here
pass
"""
def wrapper(self, *args, **kwargs):
self = cast("AbstractScenarioPlugin", self)
# Since `AbstractScenarioPlugin.run_scenarios` will call `self.run` and pass all parameters as `kwargs`
logger.debug(f"kwargs of ScenarioPlugin.run: {kwargs}")
run_uuid = kwargs.get("run_uuid", None)
# so we can safely assume that `run_uuid` will be present in `kwargs`
assert run_uuid is not None, "run_uuid must be provided in kwargs"
# Set context if run_uuid is available and rollback_handler exists
if run_uuid and hasattr(self, "rollback_handler"):
self.rollback_handler = cast("RollbackHandler", self.rollback_handler)
self.rollback_handler.set_context(run_uuid)
try:
# Execute the `run` method with the original arguments
result = func(self, *args, **kwargs)
return result
finally:
# Clear context after function execution, regardless of success or failure
if hasattr(self, "rollback_handler"):
self.rollback_handler = cast("RollbackHandler", self.rollback_handler)
self.rollback_handler.clear_context()
return wrapper
def _parse_rollback_module(version_file_path: str) -> tuple[RollbackCallable, RollbackContent]:
"""
Parse a rollback module to extract the rollback function and RollbackContent.
:param version_file_path: Path to the rollback version file
:return: Tuple of (rollback_callable, rollback_content)
"""
# Create a unique module name based on the file path
module_name = f"rollback_module_{os.path.basename(version_file_path).replace('.py', '').replace('-', '_')}"
# Load the module using importlib
spec = importlib.util.spec_from_file_location(module_name, version_file_path)
if spec is None or spec.loader is None:
raise ImportError(f"Could not load module from {version_file_path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Find the rollback function
rollback_callable = None
for name, obj in inspect.getmembers(module):
if inspect.isfunction(obj) and name.startswith('rollback_'):
# Check function signature
sig = inspect.signature(obj)
params = list(sig.parameters.values())
if (len(params) == 2 and
'RollbackContent' in str(params[0].annotation) and
'KrknTelemetryOpenshift' in str(params[1].annotation)):
rollback_callable = obj
logger.debug(f"Found rollback function: {name}")
break
if rollback_callable is None:
raise ValueError(f"No valid rollback function found in {version_file_path}")
# Find the rollback_content variable
if not hasattr(module, 'rollback_content'):
raise ValueError("Could not find variable named 'rollback_content' in the module")
rollback_content = getattr(module, 'rollback_content', None)
if rollback_content is None:
raise ValueError("Variable 'rollback_content' is None")
logger.debug(f"Found rollback_content variable in module: {rollback_content}")
return rollback_callable, rollback_content
def execute_rollback_version_files(telemetry_ocp: "KrknTelemetryOpenshift", run_uuid: str, scenario_type: str | None = None):
"""
Execute rollback version files for the given run_uuid and scenario_type.
This function is called when a signal is received to perform rollback operations.
:param run_uuid: Unique identifier for the run.
:param scenario_type: Type of the scenario being rolled back.
"""
# Get the rollback versions directory
version_files = RollbackConfig.search_rollback_version_files(run_uuid, scenario_type)
if not version_files:
logger.warning(f"Skip execution for run_uuid={run_uuid}, scenario_type={scenario_type or '*'}")
return
# Execute all version files in the directory
logger.info(f"Executing rollback version files for run_uuid={run_uuid}, scenario_type={scenario_type or '*'}")
for version_file in version_files:
try:
logger.info(f"Executing rollback version file: {version_file}")
# Parse the rollback module to get function and content
rollback_callable, rollback_content = _parse_rollback_module(version_file)
# Execute the rollback function
logger.info('Executing rollback callable...')
rollback_callable(rollback_content, telemetry_ocp)
logger.info('Rollback completed.')
logger.info(f"Executed {version_file} successfully.")
except Exception as e:
logger.error(f"Failed to execute rollback version file {version_file}: {e}")
raise
def cleanup_rollback_version_files(run_uuid: str, scenario_type: str):
"""
Cleanup rollback version files for the given run_uuid and scenario_type.
This function is called to remove the rollback version files after execution.
:param run_uuid: Unique identifier for the run.
:param scenario_type: Type of the scenario being rolled back.
"""
# Get the rollback versions directory
version_files = RollbackConfig.search_rollback_version_files(run_uuid, scenario_type)
if not version_files:
logger.warning(f"Skip cleanup for run_uuid={run_uuid}, scenario_type={scenario_type or '*'}")
return
# Remove all version files in the directory
logger.info(f"Cleaning up rollback version files for run_uuid={run_uuid}, scenario_type={scenario_type}")
for version_file in version_files:
try:
os.remove(version_file)
logger.info(f"Removed {version_file} successfully.")
except Exception as e:
logger.error(f"Failed to remove rollback version file {version_file}: {e}")
raise
class RollbackHandler:
def __init__(
self,
scenario_type: str,
serializer: "Serializer",
):
self.scenario_type = scenario_type
self.serializer = serializer
self.rollback_context: RollbackContext | None = (
None # will be set when `set_context` is called
)
def set_context(self, run_uuid: str):
"""
Set the context for the rollback handler.
:param run_uuid: Unique identifier for the run.
"""
self.rollback_context = RollbackContext(run_uuid)
logger.info(
f"Set rollback_context: {self.rollback_context} for scenario_type: {self.scenario_type} RollbackHandler"
)
def clear_context(self):
"""
Clear the run_uuid context for the rollback handler.
"""
logger.debug(
f"Clear rollback_context {self.rollback_context} for scenario type {self.scenario_type} RollbackHandler"
)
self.rollback_context = None
def set_rollback_callable(
self,
callable: "RollbackCallable",
rollback_content: "RollbackContent",
):
"""
Set the rollback callable to be executed after the scenario is finished.
:param callable: The rollback callable to be set.
:param rollback_content: The rollback content for the callable.
"""
logger.debug(
f"Rollback callable set to {callable.__name__} for version directory {RollbackConfig.get_rollback_versions_directory(self.rollback_context)}"
)
version: Version = Version.new_version(
scenario_type=self.scenario_type,
rollback_context=self.rollback_context,
)
# Serialize the callable to a file
try:
version_file = self.serializer.serialize_callable(
callable, rollback_content, version
)
logger.info(f"Rollback callable serialized to {version_file}")
except Exception as e:
logger.error(f"Failed to serialize rollback callable: {e}")

View File

@@ -0,0 +1,123 @@
import inspect
import os
import logging
from typing import TYPE_CHECKING
from jinja2 import Environment, FileSystemLoader
if TYPE_CHECKING:
from krkn.rollback.config import RollbackCallable, RollbackContent, Version
logger = logging.getLogger(__name__)
class Serializer:
def __init__(self, scenario_type: str):
self.scenario_type = scenario_type
# Set up Jinja2 environment to load templates from the rollback directory
template_dir = os.path.join(os.path.dirname(__file__))
env = Environment(loader=FileSystemLoader(template_dir))
self.template = env.get_template("version_template.j2")
def _parse_rollback_callable_code(
self, rollback_callable: "RollbackCallable"
) -> tuple[str, str]:
"""
Parse the rollback callable code to extract its implementation.
:param rollback_callable: The callable function to parse (can be staticmethod or regular function).
:return: A tuple containing (function_name, function_code).
"""
# Get the implementation code of the rollback_callable
rollback_callable_code = inspect.getsource(rollback_callable)
# Split into lines for processing
code_lines = rollback_callable_code.split("\n")
cleaned_lines = []
function_name = None
# Find the function definition line and extract function name
def_line_index = None
for i, line in enumerate(code_lines):
# Skip decorators (including @staticmethod)
if line.strip().startswith("@"):
continue
# Look for function definition
if line.strip().startswith("def "):
def_line_index = i
# Extract function name from the def line
def_line = line.strip()
if "(" in def_line:
function_name = def_line.split("def ")[1].split("(")[0].strip()
break
if def_line_index is None or function_name is None:
raise ValueError(
"Could not find function definition in callable source code"
)
# Get the base indentation level from the def line
def_line = code_lines[def_line_index]
base_indent_level = len(def_line) - len(def_line.lstrip())
# Process all lines starting from the def line
for i in range(def_line_index, len(code_lines)):
line = code_lines[i]
# Handle empty lines
if not line.strip():
cleaned_lines.append("")
continue
# Calculate current line's indentation
current_indent = len(line) - len(line.lstrip())
# Remove the base indentation to normalize to function level
if current_indent >= base_indent_level:
# Remove base indentation
normalized_line = line[base_indent_level:]
cleaned_lines.append(normalized_line)
else:
# This shouldn't happen in well-formed code, but handle it gracefully
cleaned_lines.append(line.lstrip())
# Reconstruct the code and clean up trailing whitespace
function_code = "\n".join(cleaned_lines).rstrip()
return function_name, function_code
def serialize_callable(
self,
rollback_callable: "RollbackCallable",
rollback_content: "RollbackContent",
version: "Version",
) -> str:
"""
Serialize a callable function to a file with its arguments and keyword arguments.
:param rollback_callable: The callable to serialize.
:param rollback_content: The rollback content for the callable.
:param version: The version representing the rollback context and file path for the rollback.
:return: Path to the serialized callable file.
"""
rollback_callable_name, rollback_callable_code = (
self._parse_rollback_callable_code(rollback_callable)
)
# Render the template with the required variables
file_content = self.template.render(
rollback_callable_name=rollback_callable_name,
rollback_callable_code=rollback_callable_code,
rollback_content=str(rollback_content),
)
# Write the file to the version directory
os.makedirs(os.path.dirname(version.version_file_full_path), exist_ok=True)
logger.debug("Creating version file at %s", version.version_file_full_path)
logger.debug("Version file content:\n%s", file_content)
with open(version.version_file_full_path, "w") as f:
f.write(file_content)
logger.info(f"Serialized callable written to {version.version_file_full_path}")
return version.version_file_full_path

106
krkn/rollback/signal.py Normal file
View File

@@ -0,0 +1,106 @@
from typing import Dict, Any, Optional
import threading
import signal
import sys
import logging
from contextlib import contextmanager
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.rollback.handler import execute_rollback_version_files
logger = logging.getLogger(__name__)
class SignalHandler:
# Class-level variables for signal handling (shared across all instances)
_signal_handlers_installed = False # No need for thread-safe variable due to _signal_lock
_original_handlers: Dict[int, Any] = {}
_signal_lock = threading.Lock()
# Thread-local storage for context
_local = threading.local()
@classmethod
def _set_context(cls, run_uuid: str, scenario_type: str, telemetry_ocp: KrknTelemetryOpenshift):
"""Set the current execution context for this thread."""
cls._local.run_uuid = run_uuid
cls._local.scenario_type = scenario_type
cls._local.telemetry_ocp = telemetry_ocp
logger.debug(f"Set signal context set for thread {threading.current_thread().name} - run_uuid={run_uuid}, scenario_type={scenario_type}")
@classmethod
def _get_context(cls) -> tuple[Optional[str], Optional[str], Optional[KrknTelemetryOpenshift]]:
"""Get the current execution context for this thread."""
run_uuid = getattr(cls._local, 'run_uuid', None)
scenario_type = getattr(cls._local, 'scenario_type', None)
telemetry_ocp = getattr(cls._local, 'telemetry_ocp', None)
return run_uuid, scenario_type, telemetry_ocp
@classmethod
def _signal_handler(cls, signum: int, frame):
"""Handle signals with current thread context information."""
signal_name = signal.Signals(signum).name
run_uuid, scenario_type, telemetry_ocp = cls._get_context()
if not run_uuid or not scenario_type or not telemetry_ocp:
logger.warning(f"Signal {signal_name} received without complete context, skipping rollback.")
return
# Clear the context for the next signal, as another signal may arrive before the rollback completes.
# This ensures that the rollback is performed only once.
cls._set_context(None, None, telemetry_ocp)
# Perform rollback
logger.info(f"Performing rollback for signal {signal_name} with run_uuid={run_uuid}, scenario_type={scenario_type}")
execute_rollback_version_files(telemetry_ocp, run_uuid, scenario_type)
# Call original handler if it exists
if signum not in cls._original_handlers:
logger.info(f"Signal {signal_name} has no registered handler, exiting...")
return
original_handler = cls._original_handlers[signum]
if callable(original_handler):
logger.info(f"Calling original handler for {signal_name}")
original_handler(signum, frame)
elif original_handler == signal.SIG_DFL:
# Restore default behavior
logger.info(f"Restoring default signal handler for {signal_name}")
signal.signal(signum, signal.SIG_DFL)
signal.raise_signal(signum)
@classmethod
def _register_signal_handler(cls):
"""Register signal handlers once (called by first instance)."""
with cls._signal_lock: # Lock protects _signal_handlers_installed from race conditions
if cls._signal_handlers_installed:
return
signals_to_handle = [signal.SIGINT, signal.SIGTERM]
if hasattr(signal, 'SIGHUP'):
signals_to_handle.append(signal.SIGHUP)
for sig in signals_to_handle:
try:
original_handler = signal.signal(sig, cls._signal_handler)
cls._original_handlers[sig] = original_handler
logger.debug(f"SignalHandler: Registered signal handler for {signal.Signals(sig).name}")
except (OSError, ValueError) as e:
logger.warning(f"AbstractScenarioPlugin: Could not register handler for signal {sig}: {e}")
cls._signal_handlers_installed = True
logger.info("Signal handlers registered globally")
@classmethod
@contextmanager
def signal_context(cls, run_uuid: str, scenario_type: str, telemetry_ocp: KrknTelemetryOpenshift):
"""Context manager to set the signal context for the current thread."""
cls._set_context(run_uuid, scenario_type, telemetry_ocp)
cls._register_signal_handler()
try:
yield
finally:
# Clear context after exiting the context manager
cls._set_context(None, None, telemetry_ocp)
signal_handler = SignalHandler()

View File

@@ -0,0 +1,55 @@
# This file is auto-generated by krkn-lib.
# It contains the rollback callable and its arguments for the scenario plugin.
from dataclasses import dataclass
import os
import logging
from typing import Optional
from krkn_lib.utils import SafeLogger
from krkn_lib.ocp import KrknOpenshift
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
@dataclass(frozen=True)
class RollbackContent:
resource_identifier: str
namespace: Optional[str] = None
# Actual rollback callable
{{ rollback_callable_code }}
# Create necessary variables for execution
lib_openshift = None
lib_telemetry = None
rollback_content = {{ rollback_content }}
# Main entry point for execution
if __name__ == '__main__':
# setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
]
)
# setup logging and get kubeconfig path
kubeconfig_path = os.getenv("KUBECONFIG", "~/.kube/config")
log_directory = os.path.dirname(os.path.abspath(__file__))
os.makedirs(os.path.join(log_directory, 'logs'), exist_ok=True)
# setup SafeLogger for telemetry
telemetry_log_path = os.path.join(log_directory, 'logs', 'telemetry.log')
safe_logger = SafeLogger(telemetry_log_path)
# setup krkn-lib objects
lib_openshift = KrknOpenshift(kubeconfig_path=kubeconfig_path)
lib_telemetry = KrknTelemetryOpenshift(safe_logger=safe_logger, lib_openshift=lib_openshift)
# execute
logging.info('Executing rollback callable...')
{{ rollback_callable_name }}(
rollback_content,
lib_telemetry
)
logging.info('Rollback completed.')

View File

@@ -5,9 +5,26 @@ from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn import utils
from krkn.rollback.handler import (
RollbackHandler,
execute_rollback_version_files,
cleanup_rollback_version_files
)
from krkn.rollback.signal import signal_handler
from krkn.rollback.serialization import Serializer
class AbstractScenarioPlugin(ABC):
def __init__(self, scenario_type: str = "placeholder_scenario_type"):
"""Initializes the AbstractScenarioPlugin with the scenario type and rollback configuration.
:param scenario_type: the scenario type defined in the config.yaml
"""
serializer = Serializer(
scenario_type=scenario_type,
)
self.rollback_handler = RollbackHandler(scenario_type, serializer)
@abstractmethod
def run(
self,
@@ -74,24 +91,38 @@ class AbstractScenarioPlugin(ABC):
scenario_telemetry, scenario_config
)
try:
logging.info(
f"Running {self.__class__.__name__}: {self.get_scenario_types()} -> {scenario_config}"
)
return_value = self.run(
run_uuid,
scenario_config,
krkn_config,
telemetry,
scenario_telemetry,
)
except Exception as e:
logging.error(
f"uncaught exception on scenario `run()` method: {e} "
f"please report an issue on https://github.com/krkn-chaos/krkn"
)
return_value = 1
with signal_handler.signal_context(
run_uuid=run_uuid,
scenario_type=scenario_telemetry.scenario_type,
telemetry_ocp=telemetry
):
try:
logging.info(
f"Running {self.__class__.__name__}: {self.get_scenario_types()} -> {scenario_config}"
)
# pass all the parameters by kwargs to make `set_rollback_context_decorator` get the `run_uuid` and `scenario_type`
return_value = self.run(
run_uuid=run_uuid,
scenario=scenario_config,
krkn_config=krkn_config,
lib_telemetry=telemetry,
scenario_telemetry=scenario_telemetry,
)
except Exception as e:
logging.error(
f"uncaught exception on scenario `run()` method: {e} "
f"please report an issue on https://github.com/krkn-chaos/krkn"
)
return_value = 1
# execute rollback files based on the return value
if return_value != 0:
execute_rollback_version_files(
telemetry, run_uuid, scenario_telemetry.scenario_type
)
cleanup_rollback_version_files(
run_uuid, scenario_telemetry.scenario_type
)
scenario_telemetry.exit_status = return_value
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(
@@ -118,4 +149,4 @@ class AbstractScenarioPlugin(ABC):
time.sleep(wait_duration)
return failed_scenarios, scenario_telemetries

View File

@@ -7,9 +7,12 @@ from krkn_lib.utils import get_yaml_item_value, get_random_string
from jinja2 import Template
from krkn import cerberus
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn.rollback.config import RollbackContent
from krkn.rollback.handler import set_rollback_context_decorator
class ApplicationOutageScenarioPlugin(AbstractScenarioPlugin):
@set_rollback_context_decorator
def run(
self,
run_uuid: str,
@@ -57,6 +60,13 @@ class ApplicationOutageScenarioPlugin(AbstractScenarioPlugin):
# Block the traffic by creating network policy
logging.info("Creating the network policy")
self.rollback_handler.set_rollback_callable(
self.rollback_network_policy,
RollbackContent(
namespace=namespace,
resource_identifier=policy_name,
),
)
lib_telemetry.get_lib_kubernetes().create_net_policy(
yaml_spec, namespace
)
@@ -89,5 +99,26 @@ class ApplicationOutageScenarioPlugin(AbstractScenarioPlugin):
else:
return 0
@staticmethod
def rollback_network_policy(
rollback_content: RollbackContent,
lib_telemetry: KrknTelemetryOpenshift,
):
"""Rollback function to delete the network policy created during the scenario.
:param rollback_content: Rollback content containing namespace and resource_identifier.
:param lib_telemetry: Instance of KrknTelemetryOpenshift for Kubernetes operations.
"""
try:
namespace = rollback_content.namespace
policy_name = rollback_content.resource_identifier
logging.info(
f"Rolling back network policy: {policy_name} in namespace: {namespace}"
)
lib_telemetry.get_lib_kubernetes().delete_net_policy(policy_name, namespace)
logging.info("Network policy rollback completed successfully.")
except Exception as e:
logging.error(f"Failed to rollback network policy: {e}")
def get_scenario_types(self) -> list[str]:
return ["application_outages_scenarios"]

View File

@@ -1,10 +1,10 @@
import logging
import random
import time
from asyncio import Future
import yaml
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.k8s.pods_monitor_pool import PodsMonitorPool
from krkn_lib.k8s.pod_monitor import select_and_monitor_by_namespace_pattern_and_label
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_yaml_item_value
@@ -22,27 +22,21 @@ class ContainerScenarioPlugin(AbstractScenarioPlugin):
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
) -> int:
pool = PodsMonitorPool(lib_telemetry.get_lib_kubernetes())
try:
with open(scenario, "r") as f:
cont_scenario_config = yaml.full_load(f)
for kill_scenario in cont_scenario_config["scenarios"]:
self.start_monitoring(
kill_scenario, pool
future_snapshot = self.start_monitoring(
kill_scenario,
lib_telemetry
)
killed_containers = self.container_killing_in_pod(
self.container_killing_in_pod(
kill_scenario, lib_telemetry.get_lib_kubernetes()
)
result = pool.join()
if result.error:
logging.error(
logging.error(
f"ContainerScenarioPlugin pods failed to recovery: {result.error}"
)
)
return 1
scenario_telemetry.affected_pods = result
snapshot = future_snapshot.result()
result = snapshot.get_pods_status()
scenario_telemetry.affected_pods = result
except (RuntimeError, Exception):
logging.error("ContainerScenarioPlugin exiting due to Exception %s")
@@ -53,17 +47,18 @@ class ContainerScenarioPlugin(AbstractScenarioPlugin):
def get_scenario_types(self) -> list[str]:
return ["container_scenarios"]
def start_monitoring(self, kill_scenario: dict, pool: PodsMonitorPool):
def start_monitoring(self, kill_scenario: dict, lib_telemetry: KrknTelemetryOpenshift) -> Future:
namespace_pattern = f"^{kill_scenario['namespace']}$"
label_selector = kill_scenario["label_selector"]
recovery_time = kill_scenario["expected_recovery_time"]
pool.select_and_monitor_by_namespace_pattern_and_label(
future_snapshot = select_and_monitor_by_namespace_pattern_and_label(
namespace_pattern=namespace_pattern,
label_selector=label_selector,
max_timeout=recovery_time,
field_selector="status.phase=Running"
v1_client=lib_telemetry.get_lib_kubernetes().cli
)
return future_snapshot
def container_killing_in_pod(self, cont_scenario, kubecli: KrknKubernetes):
scenario_name = get_yaml_item_value(cont_scenario, "name", "")

View File

@@ -16,9 +16,13 @@ from krkn_lib.k8s import KrknKubernetes
from krkn_lib.utils import get_random_string
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn.rollback.config import RollbackContent
from krkn.rollback.handler import set_rollback_context_decorator
class HogsScenarioPlugin(AbstractScenarioPlugin):
@set_rollback_context_decorator
def run(self, run_uuid: str, scenario: str, krkn_config: dict[str, any], lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry) -> int:
try:
@@ -79,6 +83,13 @@ class HogsScenarioPlugin(AbstractScenarioPlugin):
config.node_selector = f"kubernetes.io/hostname={node}"
pod_name = f"{config.type.value}-hog-{get_random_string(5)}"
node_resources_start = lib_k8s.get_node_resources_info(node)
self.rollback_handler.set_rollback_callable(
self.rollback_hog_pod,
RollbackContent(
namespace=config.namespace,
resource_identifier=pod_name,
),
)
lib_k8s.deploy_hog(pod_name, config)
start = time.time()
# waiting 3 seconds before starting sample collection
@@ -150,3 +161,22 @@ class HogsScenarioPlugin(AbstractScenarioPlugin):
raise exception
except queue.Empty:
pass
@staticmethod
def rollback_hog_pod(rollback_content: RollbackContent, lib_telemetry: KrknTelemetryOpenshift):
"""
Rollback function to delete hog pod.
:param rollback_content: Rollback content containing namespace and resource_identifier.
:param lib_telemetry: Instance of KrknTelemetryOpenshift for Kubernetes operations
"""
try:
namespace = rollback_content.namespace
pod_name = rollback_content.resource_identifier
logging.info(
f"Rolling back hog pod: {pod_name} in namespace: {namespace}"
)
lib_telemetry.get_lib_kubernetes().delete_pod(pod_name, namespace)
logging.info("Rollback of hog pod completed successfully.")
except Exception as e:
logging.error(f"Failed to rollback hog pod: {e}")

View File

@@ -20,7 +20,9 @@ class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
This plugin simulates a VM crash or outage scenario and supports automated or manual recovery.
"""
def __init__(self):
def __init__(self, scenario_type: str = None):
scenario_type = self.get_scenario_types()[0]
super().__init__(scenario_type)
self.k8s_client = None
self.original_vmi = None
@@ -121,7 +123,7 @@ class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
except ApiException as e:
if e.status == 404:
logging.warning(f"VMI {regex_name} not found in namespace {namespace}")
return None
return []
else:
logging.error(f"Error getting VMI {regex_name}: {e}")
raise
@@ -137,6 +139,7 @@ class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
:param scenario_telemetry: The telemetry object for recording metrics
:return: 0 for success, 1 for failure
"""
self.pods_status = PodsStatus()
try:
params = config.get("parameters", {})
vm_name = params.get("vm_name")
@@ -144,18 +147,19 @@ class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
timeout = params.get("timeout", 60)
kill_count = params.get("kill_count", 1)
disable_auto_restart = params.get("disable_auto_restart", False)
self.pods_status = PodsStatus()
if not vm_name:
logging.error("vm_name parameter is required")
return 1
raise Exception("vm_name parameter is required")
vmis_list = self.get_vmis(vm_name,namespace)
if len(vmis_list) == 0:
raise Exception(f"No matching VMs with name {vm_name} in namespace {namespace}")
rand_int = random.randint(0, len(vmis_list) - 1)
vmi = vmis_list[rand_int]
logging.info(f"Starting KubeVirt VM outage scenario for VM: {vm_name} in namespace: {namespace}")
vmi_name = vmi.get("metadata").get("name")
if not self.validate_environment(vmi_name, namespace):
return 1
return self.pods_status
vmi = self.get_vmi(vmi_name, namespace)
self.affected_pod = AffectedPod(
@@ -164,19 +168,16 @@ class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
)
if not vmi:
logging.error(f"VMI {vm_name} not found in namespace {namespace}")
return 1
return self.pods_status
self.original_vmi = vmi
logging.info(f"Captured initial state of VMI: {vm_name}")
result = self.delete_vmi(vmi_name, namespace, disable_auto_restart)
if result != 0:
return self.pods_status
result = self.wait_for_running(vmi_name,namespace, timeout)
if result != 0:
self.recover(vmi_name, namespace)
self.pods_status.unrecovered = self.affected_pod
return self.pods_status
self.affected_pod.total_recovery_time = (
@@ -192,7 +193,7 @@ class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
except Exception as e:
logging.error(f"Error executing KubeVirt VM outage scenario: {e}")
log_exception(e)
return 1
return self.pods_status
def validate_environment(self, vm_name: str, namespace: str) -> bool:
"""

View File

@@ -5,6 +5,7 @@ import time
import sys
import os
import re
import random
from traceback import format_exc
from jinja2 import Environment, FileSystemLoader
from . import kubernetes_functions as kube_helper
@@ -168,14 +169,14 @@ def get_default_interface(node: str, pod_template, cli: CoreV1Api, image: str) -
Returns:
Default interface (string) belonging to the node
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex,nodename=node, image=image))
logging.info("Creating pod to query interface on node %s" % node)
kube_helper.create_pod(cli, pod_body, "default", 300)
pod_name = f"fedtools-{pod_name_regex}"
try:
cmd = ["ip", "r"]
output = kube_helper.exec_cmd_in_pod(cli, cmd, "fedtools", "default")
output = kube_helper.exec_cmd_in_pod(cli, cmd, pod_name, "default")
if not output:
logging.error("Exception occurred while executing command in pod")
@@ -191,7 +192,7 @@ def get_default_interface(node: str, pod_template, cli: CoreV1Api, image: str) -
finally:
logging.info("Deleting pod to query interface on node")
kube_helper.delete_pod(cli, "fedtools", "default")
kube_helper.delete_pod(cli, pod_name, "default")
return interfaces
@@ -220,13 +221,15 @@ def verify_interface(
Returns:
The interface list for the node
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex,nodename=node, image=image))
logging.info("Creating pod to query interface on node %s" % node)
kube_helper.create_pod(cli, pod_body, "default", 300)
pod_name = f"fedtools-{pod_name_regex}"
try:
if input_interface_list == []:
cmd = ["ip", "r"]
output = kube_helper.exec_cmd_in_pod(cli, cmd, "fedtools", "default")
output = kube_helper.exec_cmd_in_pod(cli, cmd, pod_name, "default")
if not output:
logging.error("Exception occurred while executing command in pod")
@@ -242,7 +245,7 @@ def verify_interface(
else:
cmd = ["ip", "-br", "addr", "show"]
output = kube_helper.exec_cmd_in_pod(cli, cmd, "fedtools", "default")
output = kube_helper.exec_cmd_in_pod(cli, cmd, pod_name, "default")
if not output:
logging.error("Exception occurred while executing command in pod")
@@ -265,7 +268,7 @@ def verify_interface(
)
finally:
logging.info("Deleting pod to query interface on node")
kube_helper.delete_pod(cli, "fedtools", "default")
kube_helper.delete_pod(cli, pod_name, "default")
return input_interface_list
@@ -431,16 +434,18 @@ def create_virtual_interfaces(
- The YAML template used to instantiate a pod to create
virtual interfaces on the node
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex,nodename=node, image=image))
kube_helper.create_pod(cli, pod_body, "default", 300)
logging.info(
"Creating {0} virtual interfaces on node {1} using a pod".format(
len(interface_list), node
)
)
create_ifb(cli, len(interface_list), "modtools")
pod_name = f"modtools-{pod_name_regex}"
create_ifb(cli, len(interface_list), pod_name)
logging.info("Deleting pod used to create virtual interfaces")
kube_helper.delete_pod(cli, "modtools", "default")
kube_helper.delete_pod(cli, pod_name, "default")
def delete_virtual_interfaces(
@@ -467,11 +472,13 @@ def delete_virtual_interfaces(
"""
for node in node_list:
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex,nodename=node, image=image))
kube_helper.create_pod(cli, pod_body, "default", 300)
logging.info("Deleting all virtual interfaces on node {0}".format(node))
delete_ifb(cli, "modtools")
kube_helper.delete_pod(cli, "modtools", "default")
pod_name = f"modtools-{pod_name_regex}"
delete_ifb(cli, pod_name)
kube_helper.delete_pod(cli, pod_name, "default")
def create_ifb(cli: CoreV1Api, number: int, pod_name: str):

View File

@@ -1,7 +1,7 @@
apiVersion: v1
kind: Pod
metadata:
name: fedtools
name: fedtools-{{regex_name}}
spec:
hostNetwork: true
nodeName: {{nodename}}

View File

@@ -1,7 +1,7 @@
apiVersion: v1
kind: Pod
metadata:
name: modtools
name: modtools-{{regex_name}}
spec:
nodeName: {{nodename}}
containers:

View File

@@ -1,7 +1,7 @@
apiVersion: v1
kind: Pod
metadata:
name: modtools
name: modtools-{{regex_name}}
spec:
nodeName: {{nodename}}
containers:

View File

@@ -537,7 +537,7 @@ def get_egress_cmd(
def create_virtual_interfaces(
kubecli: KrknKubernetes, nummber: int, node: str, pod_template, image: str,
kubecli: KrknKubernetes, number: int, node: str, pod_template, image: str,
) -> None:
"""
Function that creates a privileged pod and uses it to create
@@ -561,14 +561,16 @@ def create_virtual_interfaces(
image (string)
- Image of network chaos tool
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex, nodename=node, image=image))
kubecli.create_pod(pod_body, "default", 300)
pod_name = f"modtools-{pod_name_regex}"
logging.info(
"Creating {0} virtual interfaces on node {1} using a pod".format(nummber, node)
"Creating {0} virtual interfaces on node {1} using a pod".format(number, node)
)
create_ifb(kubecli, nummber, "modtools")
create_ifb(kubecli, number, pod_name)
logging.info("Deleting pod used to create virtual interfaces")
kubecli.delete_pod("modtools", "default")
kubecli.delete_pod(pod_name, "default")
def delete_virtual_interfaces(
@@ -598,11 +600,12 @@ def delete_virtual_interfaces(
"""
for node in node_list:
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex, nodename=node, image=image))
kubecli.create_pod(pod_body, "default", 300)
logging.info("Deleting all virtual interfaces on node {0}".format(node))
delete_ifb(kubecli, "modtools")
kubecli.delete_pod("modtools", "default")
delete_ifb(kubecli, "modtools-" + pod_name_regex)
kubecli.delete_pod("modtools-" + pod_name_regex, "default")
def create_ifb(kubecli: KrknKubernetes, number: int, pod_name: str):
@@ -652,15 +655,15 @@ def list_bridges(node: str, pod_template, kubecli: KrknKubernetes, image: str) -
Returns:
List of bridges on the node.
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex, nodename=node, image=image))
logging.info("Creating pod to query bridge on node %s" % node)
kubecli.create_pod(pod_body, "default", 300)
pod_name = f"modtools-{pod_name_regex}"
try:
cmd = ["/host", "ovs-vsctl", "list-br"]
output = kubecli.exec_cmd_in_pod(
cmd, "modtools", "default", base_command="chroot"
cmd, pod_name, "default", base_command="chroot"
)
if not output:
@@ -671,7 +674,7 @@ def list_bridges(node: str, pod_template, kubecli: KrknKubernetes, image: str) -
finally:
logging.info("Deleting pod to query interface on node")
kubecli.delete_pod("modtools", "default")
kubecli.delete_pod(pod_name, "default")
return bridges
@@ -704,11 +707,11 @@ def check_cookie(
Returns
Returns the matching flow rules
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name = pod_name_regex,nodename=node, image=image))
logging.info("Creating pod to query duplicate rules on node %s" % node)
kubecli.create_pod(pod_body, "default", 300)
pod_name = f"modtools-{pod_name_regex}"
try:
cmd = [
"chroot",
@@ -721,7 +724,7 @@ def check_cookie(
f"cookie={cookie}/-1",
]
output = kubecli.exec_cmd_in_pod(
cmd, "modtools", "default", base_command="chroot"
cmd, pod_name, "default", base_command="chroot"
)
if not output:
@@ -732,7 +735,7 @@ def check_cookie(
finally:
logging.info("Deleting pod to query interface on node")
kubecli.delete_pod("modtools", "default")
kubecli.delete_pod(pod_name, "default")
return flow_list
@@ -763,12 +766,12 @@ def get_pod_interface(
Returns
Returns the pod interface name
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex, nodename=node, image=image))
logging.info("Creating pod to query pod interface on node %s" % node)
kubecli.create_pod(pod_body, "default", 300)
inf = ""
pod_name = f"modtools-{pod_name_regex}"
try:
if br_name == "br-int":
find_ip = f"external-ids:ip_addresses={ip}/23"
@@ -786,12 +789,12 @@ def get_pod_interface(
]
output = kubecli.exec_cmd_in_pod(
cmd, "modtools", "default", base_command="chroot"
cmd, pod_name, "default", base_command="chroot"
)
if not output:
cmd = ["/host", "ip", "addr", "show"]
output = kubecli.exec_cmd_in_pod(
cmd, "modtools", "default", base_command="chroot"
cmd, pod_name, "default", base_command="chroot"
)
for if_str in output.split("\n"):
if re.search(ip, if_str):
@@ -800,7 +803,7 @@ def get_pod_interface(
inf = output
finally:
logging.info("Deleting pod to query interface on node")
kubecli.delete_pod("modtools", "default")
kubecli.delete_pod(pod_name, "default")
return inf

View File

@@ -161,17 +161,19 @@ class NetworkChaosScenarioPlugin(AbstractScenarioPlugin):
self, test_interface, nodelst, template, kubecli: KrknKubernetes, image: str
):
pod_index = random.randint(0, len(nodelst) - 1)
pod_body = yaml.safe_load(template.render(nodename=nodelst[pod_index], image=image))
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(template.render(regex_name=pod_name_regex,nodename=nodelst[pod_index], image=image))
logging.info("Creating pod to query interface on node %s" % nodelst[pod_index])
kubecli.create_pod(pod_body, "default", 300)
pod_name = f"fedtools-{pod_name_regex}"
try:
if test_interface == []:
cmd = "ip r | grep default | awk '/default/ {print $5}'"
output = kubecli.exec_cmd_in_pod(cmd, "fedtools", "default")
output = kubecli.exec_cmd_in_pod(cmd, pod_name, "default")
test_interface = [output.replace("\n", "")]
else:
cmd = "ip -br addr show|awk -v ORS=',' '{print $1}'"
output = kubecli.exec_cmd_in_pod(cmd, "fedtools", "default")
output = kubecli.exec_cmd_in_pod(cmd, pod_name, "default")
interface_lst = output[:-1].split(",")
for interface in test_interface:
if interface not in interface_lst:
@@ -183,7 +185,7 @@ class NetworkChaosScenarioPlugin(AbstractScenarioPlugin):
return test_interface
finally:
logging.info("Deleting pod to query interface on node")
kubecli.delete_pod("fedtools", "default")
kubecli.delete_pod(pod_name, "default")
# krkn_lib
def get_job_pods(self, api_response, kubecli: KrknKubernetes):

View File

@@ -1,7 +1,7 @@
apiVersion: v1
kind: Pod
metadata:
name: fedtools
name: fedtools-{{regex_name}}
spec:
hostNetwork: true
nodeName: {{nodename}}

View File

@@ -1,14 +1,16 @@
import logging
import random
import time
from asyncio import Future
import yaml
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.k8s.pods_monitor_pool import PodsMonitorPool
from krkn_lib.k8s.pod_monitor import select_and_monitor_by_namespace_pattern_and_label, \
select_and_monitor_by_name_pattern_and_namespace_pattern
from krkn.scenario_plugins.pod_disruption.models.models import InputParams
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_yaml_item_value
from datetime import datetime
from dataclasses import dataclass
@@ -29,31 +31,23 @@ class PodDisruptionScenarioPlugin(AbstractScenarioPlugin):
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
) -> int:
pool = PodsMonitorPool(lib_telemetry.get_lib_kubernetes())
try:
with open(scenario, "r") as f:
cont_scenario_config = yaml.full_load(f)
for kill_scenario in cont_scenario_config:
kill_scenario_config = InputParams(kill_scenario["config"])
self.start_monitoring(
kill_scenario_config, pool
future_snapshot=self.start_monitoring(
kill_scenario_config,
lib_telemetry
)
return_status = self.killing_pods(
self.killing_pods(
kill_scenario_config, lib_telemetry.get_lib_kubernetes()
)
if return_status != 0:
result = pool.cancel()
else:
result = pool.join()
if result.error:
logging.error(
logging.error(
f"PodDisruptionScenariosPlugin pods failed to recovery: {result.error}"
)
)
return 1
scenario_telemetry.affected_pods = result
snapshot = future_snapshot.result()
result = snapshot.get_pods_status()
scenario_telemetry.affected_pods = result
except (RuntimeError, Exception) as e:
logging.error("PodDisruptionScenariosPlugin exiting due to Exception %s" % e)
@@ -64,7 +58,7 @@ class PodDisruptionScenarioPlugin(AbstractScenarioPlugin):
def get_scenario_types(self) -> list[str]:
return ["pod_disruption_scenarios"]
def start_monitoring(self, kill_scenario: InputParams, pool: PodsMonitorPool):
def start_monitoring(self, kill_scenario: InputParams, lib_telemetry: KrknTelemetryOpenshift) -> Future:
recovery_time = kill_scenario.krkn_pod_recovery_time
if (
@@ -73,16 +67,17 @@ class PodDisruptionScenarioPlugin(AbstractScenarioPlugin):
):
namespace_pattern = kill_scenario.namespace_pattern
label_selector = kill_scenario.label_selector
pool.select_and_monitor_by_namespace_pattern_and_label(
future_snapshot = select_and_monitor_by_namespace_pattern_and_label(
namespace_pattern=namespace_pattern,
label_selector=label_selector,
max_timeout=recovery_time,
field_selector="status.phase=Running"
v1_client=lib_telemetry.get_lib_kubernetes().cli
)
logging.info(
f"waiting up to {recovery_time} seconds for pod recovery, "
f"pod label pattern: {label_selector} namespace pattern: {namespace_pattern}"
)
return future_snapshot
elif (
kill_scenario.namespace_pattern
@@ -90,16 +85,17 @@ class PodDisruptionScenarioPlugin(AbstractScenarioPlugin):
):
namespace_pattern = kill_scenario.namespace_pattern
name_pattern = kill_scenario.name_pattern
pool.select_and_monitor_by_name_pattern_and_namespace_pattern(
future_snapshot = select_and_monitor_by_name_pattern_and_namespace_pattern(
pod_name_pattern=name_pattern,
namespace_pattern=namespace_pattern,
max_timeout=recovery_time,
field_selector="status.phase=Running"
v1_client=lib_telemetry.get_lib_kubernetes().cli
)
logging.info(
f"waiting up to {recovery_time} seconds for pod recovery, "
f"pod name pattern: {name_pattern} namespace pattern: {namespace_pattern}"
)
return future_snapshot
else:
raise Exception(
f"impossible to determine monitor parameters, check {kill_scenario} configuration"

View File

@@ -33,7 +33,7 @@ class ScenarioPluginFactory:
inherits from the AbstractScenarioPlugin abstract class
"""
if scenario_type in self.loaded_plugins:
return self.loaded_plugins[scenario_type]()
return self.loaded_plugins[scenario_type](scenario_type)
else:
raise ScenarioPluginNotFound(
f"Failed to load the {scenario_type} scenario plugin. "
@@ -61,7 +61,10 @@ class ScenarioPluginFactory:
continue
cls = getattr(module, name)
instance = cls()
# The AbstractScenarioPlugin constructor requires a scenario_type.
# However, since we only need to call `get_scenario_types()` here,
# it is acceptable to use a placeholder value.
instance = cls("placeholder_scenario_type")
get_scenario_type = getattr(instance, "get_scenario_types")
scenario_types = get_scenario_type()
has_duplicates = False

View File

@@ -5,7 +5,7 @@ import yaml
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn_lib.utils import get_yaml_item_value
class ServiceHijackingScenarioPlugin(AbstractScenarioPlugin):
def run(
@@ -25,6 +25,8 @@ class ServiceHijackingScenarioPlugin(AbstractScenarioPlugin):
image = scenario_config["image"]
target_port = scenario_config["service_target_port"]
chaos_duration = scenario_config["chaos_duration"]
privileged = get_yaml_item_value(scenario_config,"privileged", True)
logging.info(
f"checking service {service_name} in namespace: {service_namespace}"
@@ -46,14 +48,14 @@ class ServiceHijackingScenarioPlugin(AbstractScenarioPlugin):
logging.info(f"webservice will listen on port {target_port}")
webservice = (
lib_telemetry.get_lib_kubernetes().deploy_service_hijacking(
service_namespace, plan, image, port_number=target_port
service_namespace, plan, image, port_number=target_port, privileged=privileged
)
)
else:
logging.info(f"traffic will be redirected to named port: {target_port}")
webservice = (
lib_telemetry.get_lib_kubernetes().deploy_service_hijacking(
service_namespace, plan, image, port_name=target_port
service_namespace, plan, image, port_name=target_port, privileged=privileged
)
)
logging.info(

135
krkn/utils/VirtChecker.py Normal file
View File

@@ -0,0 +1,135 @@
import time
import logging
import queue
from datetime import datetime
from krkn_lib.models.telemetry.models import VirtCheck
from krkn.invoke.command import invoke_no_exit
from krkn.scenario_plugins.kubevirt_vm_outage.kubevirt_vm_outage_scenario_plugin import KubevirtVmOutageScenarioPlugin
from krkn_lib.k8s import KrknKubernetes
import threading
from krkn_lib.utils.functions import get_yaml_item_value
class VirtChecker:
current_iterations: int = 0
ret_value = 0
def __init__(self, kubevirt_check_config, iterations, krkn_lib: KrknKubernetes, threads_limt=20):
self.iterations = iterations
self.namespace = get_yaml_item_value(kubevirt_check_config, "namespace", "")
self.vm_list = []
self.threads = []
self.threads_limit = threads_limt
if self.namespace == "":
logging.info("kube virt checks config is not defined, skipping them")
return
vmi_name_match = get_yaml_item_value(kubevirt_check_config, "name", ".*")
self.krkn_lib = krkn_lib
self.disconnected = get_yaml_item_value(kubevirt_check_config, "disconnected", False)
self.only_failures = get_yaml_item_value(kubevirt_check_config, "only_failures", False)
self.interval = get_yaml_item_value(kubevirt_check_config, "interval", 2)
try:
self.kube_vm_plugin = KubevirtVmOutageScenarioPlugin()
self.kube_vm_plugin.init_clients(k8s_client=krkn_lib)
vmis = self.kube_vm_plugin.get_vmis(vmi_name_match,self.namespace)
except Exception as e:
logging.error('Virt Check init exception: ' + str(e))
return
for vmi in vmis:
node_name = vmi.get("status",{}).get("nodeName")
vmi_name = vmi.get("metadata",{}).get("name")
ip_address = vmi.get("status",{}).get("interfaces",[])[0].get("ipAddress")
self.vm_list.append(VirtCheck({'vm_name':vmi_name, 'ip_address': ip_address, 'namespace':self.namespace, 'node_name':node_name}))
def check_disconnected_access(self, ip_address: str, worker_name:str = ''):
virtctl_vm_cmd = f"ssh core@{worker_name} 'ssh -o BatchMode=yes -o ConnectTimeout=2 -o StrictHostKeyChecking=no root@{ip_address} 2>&1 | grep Permission' && echo 'True' || echo 'False'"
if 'True' in invoke_no_exit(virtctl_vm_cmd):
return True
else:
return False
def get_vm_access(self, vm_name: str = '', namespace: str = ''):
"""
This method returns True when the VM is access and an error message when it is not, using virtctl protocol
:param vm_name:
:param namespace:
:return: virtctl_status 'True' if successful, or an error message if it fails.
"""
virtctl_vm_cmd = f"virtctl ssh --local-ssh-opts='-o BatchMode=yes' --local-ssh-opts='-o PasswordAuthentication=no' --local-ssh-opts='-o ConnectTimeout=2' root@vmi/{vm_name} -n {namespace} 2>&1 |egrep 'denied|verification failed' && echo 'True' || echo 'False'"
check_virtctl_vm_cmd = f"virtctl ssh --local-ssh-opts='-o BatchMode=yes' --local-ssh-opts='-o PasswordAuthentication=no' --local-ssh-opts='-o ConnectTimeout=2' root@{vm_name} -n {namespace} 2>&1 |egrep 'denied|verification failed' && echo 'True' || echo 'False'"
if 'True' in invoke_no_exit(check_virtctl_vm_cmd):
return True
else:
second_invoke = invoke_no_exit(virtctl_vm_cmd)
if 'True' in second_invoke:
return True
return False
def thread_join(self):
for thread in self.threads:
thread.join()
def batch_list(self, queue: queue.Queue, batch_size=20):
# Provided prints to easily visualize how the threads are processed.
for i in range (0, len(self.vm_list),batch_size):
sub_list = self.vm_list[i: i+batch_size]
index = i
t = threading.Thread(target=self.run_virt_check,name=str(index), args=(sub_list,queue))
self.threads.append(t)
t.start()
def run_virt_check(self, vm_list_batch, virt_check_telemetry_queue: queue.Queue):
virt_check_telemetry = []
virt_check_tracker = {}
while self.current_iterations < self.iterations:
for vm in vm_list_batch:
try:
if not self.disconnected:
vm_status = self.get_vm_access(vm.vm_name, vm.namespace)
else:
vm_status = self.check_disconnected_access(vm.ip_address, vm.node_name)
except Exception:
vm_status = False
if vm.vm_name not in virt_check_tracker:
start_timestamp = datetime.now()
virt_check_tracker[vm.vm_name] = {
"vm_name": vm.vm_name,
"ip_address": vm.ip_address,
"namespace": vm.namespace,
"node_name": vm.node_name,
"status": vm_status,
"start_timestamp": start_timestamp
}
else:
if vm_status != virt_check_tracker[vm.vm_name]["status"]:
end_timestamp = datetime.now()
start_timestamp = virt_check_tracker[vm.vm_name]["start_timestamp"]
duration = (end_timestamp - start_timestamp).total_seconds()
virt_check_tracker[vm.vm_name]["end_timestamp"] = end_timestamp.isoformat()
virt_check_tracker[vm.vm_name]["duration"] = duration
virt_check_tracker[vm.vm_name]["start_timestamp"] = start_timestamp.isoformat()
if self.only_failures:
if not virt_check_tracker[vm.vm_name]["status"]:
virt_check_telemetry.append(VirtCheck(virt_check_tracker[vm.vm_name]))
else:
virt_check_telemetry.append(VirtCheck(virt_check_tracker[vm.vm_name]))
del virt_check_tracker[vm.vm_name]
time.sleep(self.interval)
virt_check_end_time_stamp = datetime.now()
for vm in virt_check_tracker.keys():
final_start_timestamp = virt_check_tracker[vm]["start_timestamp"]
final_duration = (virt_check_end_time_stamp - final_start_timestamp).total_seconds()
virt_check_tracker[vm]["end_timestamp"] = virt_check_end_time_stamp.isoformat()
virt_check_tracker[vm]["duration"] = final_duration
virt_check_tracker[vm]["start_timestamp"] = final_start_timestamp.isoformat()
if self.only_failures:
if not virt_check_tracker[vm]["status"]:
virt_check_telemetry.append(VirtCheck(virt_check_tracker[vm]))
else:
virt_check_telemetry.append(VirtCheck(virt_check_tracker[vm]))
virt_check_telemetry_queue.put(virt_check_telemetry)

View File

@@ -16,7 +16,7 @@ google-cloud-compute==1.22.0
ibm_cloud_sdk_core==3.18.0
ibm_vpc==0.20.0
jinja2==3.1.6
krkn-lib==5.1.0
krkn-lib==5.1.6
lxml==5.1.0
kubernetes==28.1.0
numpy==1.26.4

View File

@@ -11,6 +11,7 @@ import uuid
import time
import queue
import threading
from typing import Optional
from krkn_lib.elastic.krkn_elastic import KrknElastic
from krkn_lib.models.elastic import ElasticChaosRunTelemetry
@@ -28,10 +29,16 @@ from krkn_lib.utils.functions import get_yaml_item_value, get_junit_test_case
from krkn.utils import TeeLogHandler
from krkn.utils.HealthChecker import HealthChecker
from krkn.utils.VirtChecker import VirtChecker
from krkn.scenario_plugins.scenario_plugin_factory import (
ScenarioPluginFactory,
ScenarioPluginNotFound,
)
from krkn.rollback.config import RollbackConfig
from krkn.rollback.command import (
list_rollback as list_rollback_command,
execute_rollback as execute_rollback_command,
)
# removes TripleDES warning
import warnings
@@ -39,13 +46,13 @@ warnings.filterwarnings(action='ignore', module='.*paramiko.*')
report_file = ""
# Main function
def main(cfg) -> int:
def main(options, command: Optional[str]) -> int:
# Start kraken
print(pyfiglet.figlet_format("kraken"))
logging.info("Starting kraken")
cfg = options.cfg
# Parse and read the config
if os.path.isfile(cfg):
with open(cfg, "r") as f:
@@ -61,6 +68,18 @@ def main(cfg) -> int:
config["kraken"], "publish_kraken_status", False
)
port = get_yaml_item_value(config["kraken"], "port", 8081)
RollbackConfig.register(
auto=get_yaml_item_value(
config["kraken"],
"auto_rollback",
False
),
versions_directory=get_yaml_item_value(
config["kraken"],
"rollback_versions_directory",
"/tmp/kraken-rollback"
),
)
signal_address = get_yaml_item_value(
config["kraken"], "signal_address", "0.0.0.0"
)
@@ -112,7 +131,8 @@ def main(cfg) -> int:
config["performance_monitoring"], "check_critical_alerts", False
)
telemetry_api_url = config["telemetry"].get("api_url")
health_check_config = config["health_checks"]
health_check_config = get_yaml_item_value(config, "health_checks",{})
kubevirt_check_config = get_yaml_item_value(config, "kubevirt_checks", {})
# Initialize clients
if not os.path.isfile(kubeconfig_path) and not os.path.isfile(
@@ -231,6 +251,19 @@ def main(cfg) -> int:
logging.info("Server URL: %s" % kubecli.get_host())
if command == "list-rollback":
sys.exit(
list_rollback_command(
options.run_uuid, options.scenario_type
)
)
elif command == "execute-rollback":
sys.exit(
execute_rollback_command(
telemetry_ocp, options.run_uuid, options.scenario_type
)
)
# Initialize the start iteration to 0
iteration = 0
@@ -293,6 +326,10 @@ def main(cfg) -> int:
args=(health_check_config, health_check_telemetry_queue))
health_check_worker.start()
kubevirt_check_telemetry_queue = queue.Queue()
kubevirt_checker = VirtChecker(kubevirt_check_config, iterations=iterations, krkn_lib=kubecli)
kubevirt_checker.batch_list(kubevirt_check_telemetry_queue)
# Loop to run the chaos starts here
while int(iteration) < iterations and run_signal != "STOP":
# Inject chaos scenarios specified in the config
@@ -354,6 +391,7 @@ def main(cfg) -> int:
iteration += 1
health_checker.current_iterations += 1
kubevirt_checker.current_iterations += 1
# telemetry
# in order to print decoded telemetry data even if telemetry collection
@@ -365,6 +403,17 @@ def main(cfg) -> int:
chaos_telemetry.health_checks = health_check_telemetry_queue.get_nowait()
except queue.Empty:
chaos_telemetry.health_checks = None
kubevirt_checker.thread_join()
kubevirt_check_telem = []
i =0
while i <= kubevirt_checker.threads_limit:
if not kubevirt_check_telemetry_queue.empty():
kubevirt_check_telem.extend(kubevirt_check_telemetry_queue.get_nowait())
else:
break
i+= 1
chaos_telemetry.virt_checks = kubevirt_check_telem
# if platform is openshift will be collected
# Cloud platform and network plugins metadata
@@ -519,7 +568,13 @@ def main(cfg) -> int:
if __name__ == "__main__":
# Initialize the parser to read the config
parser = optparse.OptionParser()
parser = optparse.OptionParser(
usage="%prog [options] [command]\n\n"
"Commands:\n"
" list-rollback List rollback version files in a tree-like format\n"
" execute-rollback Execute rollback version files and cleanup if successful\n\n"
"If no command is specified, kraken will run chaos scenarios.",
)
parser.add_option(
"-c",
"--config",
@@ -556,7 +611,26 @@ if __name__ == "__main__":
default=None,
)
# Add rollback command options
parser.add_option(
"-r",
"--run_uuid",
dest="run_uuid",
help="run UUID to filter rollback operations",
default=None,
)
parser.add_option(
"-s",
"--scenario_type",
dest="scenario_type",
help="scenario type to filter rollback operations",
default=None,
)
(options, args) = parser.parse_args()
# If no command or regular execution, continue with existing logic
report_file = options.output
tee_handler = TeeLogHandler()
handlers = [
@@ -625,7 +699,9 @@ if __name__ == "__main__":
if option_error:
retval = 1
else:
retval = main(options.cfg)
# Check if command is provided as positional argument
command = args[0] if args else None
retval = main(options, command)
junit_endtime = time.time()

View File

@@ -5,6 +5,7 @@ service_name: nginx-service # name of the service to be hijacked
service_namespace: default # The namespace where the target service is located
image: quay.io/krkn-chaos/krkn-service-hijacking:v0.1.3 # Image of the krkn web service to be deployed to receive traffic.
chaos_duration: 30 # Total duration of the chaos scenario in seconds.
privileged: True # True or false if need privileged securityContext to run
plan:
- resource: "/list/index.php" # Specifies the resource or path to respond to in the scenario. For paths, both the path and query parameters are captured but ignored.
# For resources, only query parameters are captured.

0
tests/__init__.py Normal file
View File

View File

@@ -0,0 +1,63 @@
import logging
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn.rollback.config import RollbackContent
from krkn.rollback.handler import set_rollback_context_decorator
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
class SimpleRollbackScenarioPlugin(AbstractScenarioPlugin):
"""
Mock implementation of RollbackScenarioPlugin for testing purposes.
This plugin does not perform any actual rollback operations.
"""
@set_rollback_context_decorator
def run(
self,
run_uuid: str,
scenario: str,
krkn_config: dict[str, any],
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
) -> int:
logger.info(
f"Setting rollback callable for run {run_uuid} with scenario {scenario}."
)
logger.debug(f"Krkn config: {krkn_config}")
self.rollback_handler.set_rollback_callable(
self.rollback_callable,
RollbackContent(
resource_identifier=run_uuid,
),
)
logger.info("Rollback callable set successfully.")
print("Rollback callable has been set for the scenario.")
return 0
def get_scenario_types(self) -> list[str]:
"""
Returns the scenario types that this plugin supports.
:return: a list of scenario types
"""
return ["simple_rollback_scenario"]
@staticmethod
def rollback_callable(
rollback_context: RollbackContent, lib_telemetry: KrknTelemetryOpenshift
):
"""
Simple rollback callable that simulates a rollback operation.
"""
run_uuid = rollback_context.resource_identifier
print(f"Rollback called for run {run_uuid}.")
# Simulate a rollback operation
# In a real scenario, this would contain logic to revert changes made during the scenario execution.
print("Rollback operation completed successfully.")

160
tests/test_rollback.py Normal file
View File

@@ -0,0 +1,160 @@
import pytest
import logging
import os
import sys
import uuid
import subprocess
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.ocp import KrknOpenshift
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils import SafeLogger
from krkn.rollback.config import RollbackConfig
sys.path.append(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
) # Adjust path to include krkn
TEST_LOGS_DIR = "/tmp/krkn_test_rollback_logs_directory"
TEST_VERSIONS_DIR = "/tmp/krkn_test_rollback_versions_directory"
class TestRollbackScenarioPlugin:
def validate_rollback_directory(
self, run_uuid: str, scenario: str, versions: int = 1
) -> list[str]:
"""
Validate that the rollback directory exists and contains version files.
:param run_uuid: The UUID for current run, used to identify the rollback context directory.
:param scenario: The name of the scenario to validate.
:param versions: The expected number of version files.
:return: List of version files in full path.
"""
rollback_context_directories = [
dirname for dirname in os.listdir(TEST_VERSIONS_DIR) if run_uuid in dirname
]
assert len(rollback_context_directories) == 1, (
f"Expected one directory for run UUID {run_uuid}, found: {rollback_context_directories}"
)
scenario_rollback_versions_directory = os.path.join(
TEST_VERSIONS_DIR, rollback_context_directories[0]
)
version_files = os.listdir(scenario_rollback_versions_directory)
assert len(version_files) == versions, (
f"Expected {versions} version files, found: {len(version_files)}"
)
for version_file in version_files:
assert version_file.startswith(scenario), (
f"Version file {version_file} does not start with '{scenario}'"
)
assert version_file.endswith(".py"), (
f"Version file {version_file} does not end with '.py'"
)
return [
os.path.join(scenario_rollback_versions_directory, vf)
for vf in version_files
]
def execute_version_file(self, version_file: str):
"""
Execute a rollback version file using subprocess.
:param version_file: The path to the version file to execute.
"""
print(f"Executing rollback version file: {version_file}")
result = subprocess.run(
[sys.executable, version_file],
capture_output=True,
text=True,
)
assert result.returncode == 0, (
f"Rollback version file {version_file} failed with return code {result.returncode}. "
f"Output: {result.stdout}, Error: {result.stderr}"
)
print(
f"Rollback version file executed successfully: {version_file} with output: {result.stdout}"
)
@pytest.fixture(autouse=True)
def setup_logging(self):
os.makedirs(TEST_LOGS_DIR, exist_ok=True)
# setup logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(os.path.join(TEST_LOGS_DIR, "test_rollback.log")),
logging.StreamHandler(),
],
)
@pytest.fixture(scope="module")
def kubeconfig_path(self):
# Provide the path to the kubeconfig file for testing
return os.getenv("KUBECONFIG", "~/.kube/config")
@pytest.fixture(scope="module")
def safe_logger(self):
os.makedirs(TEST_LOGS_DIR, exist_ok=True)
with open(os.path.join(TEST_LOGS_DIR, "telemetry.log"), "w") as f:
pass # Create the file if it doesn't exist
yield SafeLogger(filename=os.path.join(TEST_LOGS_DIR, "telemetry.log"))
@pytest.fixture(scope="module")
def kubecli(self, kubeconfig_path):
yield KrknKubernetes(kubeconfig_path=kubeconfig_path)
@pytest.fixture(scope="module")
def lib_openshift(self, kubeconfig_path):
yield KrknOpenshift(kubeconfig_path=kubeconfig_path)
@pytest.fixture(scope="module")
def lib_telemetry(self, lib_openshift, safe_logger):
yield KrknTelemetryOpenshift(
safe_logger=safe_logger,
lib_openshift=lib_openshift,
)
@pytest.fixture(scope="module")
def scenario_telemetry(self):
yield ScenarioTelemetry()
@pytest.fixture(scope="module")
def setup_rollback_config(self):
RollbackConfig.register(
auto=False,
versions_directory=TEST_VERSIONS_DIR,
)
@pytest.mark.usefixtures("setup_rollback_config")
def test_simple_rollback_scenario_plugin(self, lib_telemetry, scenario_telemetry):
from tests.rollback_scenario_plugins.simple import SimpleRollbackScenarioPlugin
scenario_type = "simple_rollback_scenario"
simple_rollback_scenario_plugin = SimpleRollbackScenarioPlugin(
scenario_type=scenario_type,
)
run_uuid = str(uuid.uuid4())
simple_rollback_scenario_plugin.run(
run_uuid=run_uuid,
scenario="test_scenario",
krkn_config={
"key1": "value",
"key2": False,
"key3": 123,
"key4": ["value1", "value2", "value3"],
},
lib_telemetry=lib_telemetry,
scenario_telemetry=scenario_telemetry,
)
# Validate the rollback directory and version files do exist
version_files = self.validate_rollback_directory(
run_uuid,
scenario_type,
)
# Execute the rollback version file
for version_file in version_files:
self.execute_version_file(version_file)