Compare commits

...

26 Commits

Author SHA1 Message Date
Tullio Sebastiani
99b14c5652 filled the already checked activities
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-11-24 16:11:57 +01:00
Tullio Sebastiani
e377faa0e3 added incubation checklist
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-11-24 16:02:15 +01:00
FAUST.
b462c46b28 feat:Add exlude_label in container scenario (#966)
* feat:Add exlude_label in container scenario

Signed-off-by: zhoujinyu <2319109590@qq.com>

* refactor:use list_pods with exclude_label in container scenario

Signed-off-by: zhoujinyu <2319109590@qq.com>

---------

Signed-off-by: zhoujinyu <2319109590@qq.com>
Co-authored-by: Tullio Sebastiani <tsebastiani@users.noreply.github.com>
2025-11-24 15:59:36 +01:00
FAUST.
ab4ae85896 feat:Add exclude label to application outage (#967)
* feat:Add exclude label to application outage

Signed-off-by: zhoujinyu <2319109590@qq.com>

* chore: add missing comments

Signed-off-by: zhoujinyu <2319109590@qq.com>

* chore: adjust comments

Signed-off-by: zhoujinyu <2319109590@qq.com>

---------

Signed-off-by: zhoujinyu <2319109590@qq.com>
2025-11-24 15:54:05 +01:00
Paige Patton
6acd6f9bd3 adding common vars for new kubevirt checks (#973)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 4m58s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-21 09:51:46 -05:00
Paige Patton
787759a591 removing pycache from files found (#972)
Assisted By: Claude Code

Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-21 09:50:35 -05:00
Paige Patton
957cb355be not properly getting auto variable in RollbackConfig (#971)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-21 09:50:20 -05:00
Paige Patton
35609484d4 fixing batch size limit (#964)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-21 09:47:41 -05:00
LIU ZHE YOU
959337eb63 [Rollback Scenario] Refactor execution (#895)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m28s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* Validate version file format

* Add validation for context dir, Exexcute all files by default

* Consolidate execute and cleanup, rename with .executed instead of
removing

* Respect auto_rollback config

* Add cleanup back but only for scenario successed

---------

Co-authored-by: Tullio Sebastiani <tsebastiani@users.noreply.github.com>
2025-11-19 14:14:06 +01:00
Sai Sanjay
f4bdbff9dc Add rollback functionality to SynFloodScenarioPlugin (#948)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 8m48s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* Add rollback functionality to SynFloodScenarioPlugin

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>

* Refactor rollback pod handling in SynFloodScenarioPlugin to handle podnames as string

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>

* Update resource identifier handling in SynFloodScenarioPlugin to use list format for rollback functionality

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>

* Refactor chaos scenario configurations in config.yaml to comment out existing scenarios for clarity. Update rollback method in SynFloodScenarioPlugin to improve pod cleanup handling. Modify pvc_scenario.yaml with specific test values for better usability.

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>

* Enhance rollback functionality in SynFloodScenarioPlugin by encoding pod names in base64 format for improved data handling.

Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>

---------

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>
Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>
Co-authored-by: Tullio Sebastiani <tsebastiani@users.noreply.github.com>
2025-11-19 11:18:50 +01:00
Sai Sanjay
954202cab7 Add rollback functionality to ServiceHijackingScenarioPlugin (#949)
* Add rollback functionality to ServiceHijackingScenarioPlugin

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>

* Refactor rollback data handling in ServiceHijackingScenarioPlugin as json string

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>

* Update rollback data handling in ServiceHijackingScenarioPlugin to decode directly from resource_identifier

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>

* Add import statement for JSON handling in ServiceHijackingScenarioPlugin

This change introduces an import statement for the JSON module to facilitate the decoding of rollback data from the resource identifier.

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>

* feat: Enhance rollback data handling in ServiceHijackingScenarioPlugin by encoding and decoding as base64 strings.

Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>

---------

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>
Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>
Co-authored-by: Tullio Sebastiani <tsebastiani@users.noreply.github.com>
2025-11-19 11:18:15 +01:00
Paige Patton
a373dcf453 adding virt checker tests (#960)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 3m45s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-18 14:27:59 -05:00
Paige Patton
d0c604a516 timeout on main ssh to worker (#957)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 8m22s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-18 09:02:41 -05:00
Sai Sanjay
82582f5bc3 Add PVC Scenario Rollback Feature (#947)
* Add PVC outage scenario plugin to manage PVC annotations during outages

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>

* Remove PvcOutageScenarioPlugin as it is no longer needed; refactor PvcScenarioPlugin to include rollback functionality for temporary file cleanup during PVC scenarios.

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>

* Refactor rollback_data handling in PvcScenarioPlugin to use str() instead of json.dumps() for resource_identifier.

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>

* Import json module in PvcScenarioPlugin for decoding rollback data from resource_identifier.

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>

* feat: Encode rollback data in base64 format for resource_identifier in PvcScenarioPlugin to enhance data handling and security.

Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>

* feat: refactor: Update logging level from debug to info for temp file operations in PvcScenarioPlugin to improve visibility of command execution.

Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>

---------

Signed-off-by: sanjay7178 <saisanjay7660@gmail.com>
Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com>
Co-authored-by: Paige Patton <64206430+paigerube14@users.noreply.github.com>
2025-11-18 08:10:44 -05:00
Paige Patton
37f0f1eb8b fixing spacing
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 8m39s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-18 02:25:09 -05:00
Paige Patton
d2eab21f95 adding centos image fix (#958)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 10m5s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-17 12:28:53 -05:00
Paige Patton
d84910299a typo (#956)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m22s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-13 13:23:58 -05:00
Harry C
48f19c0a0e Fix type: kubleci -> kubecli in time scenario exclude_label (#955)
Signed-off-by: Harry12980 <onlyharryc@gmail.com>
2025-11-13 13:15:36 -05:00
Paige Patton
eb86885bcd adding kube virt check failure (#952)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m14s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-13 10:37:42 -05:00
Paige Patton
967fd14bd7 adding namespace regex match (#954)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-13 09:44:20 -05:00
Harry C
5cefe80286 Add exclude_label parameter to time disruption scenario (#953)
Signed-off-by: Harry12980 <onlyharryc@gmail.com>
2025-11-13 15:21:55 +01:00
Paige Patton
9ee76ce337 post chaos (#939)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m40s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-11 14:11:04 -05:00
Tullio Sebastiani
fd3e7ee2c8 Fixes several Image cves (#941)
* fixes some CVEs on the base image

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* oc dependencies updated

* virtctl build

fix

removed virtctil installation

pip

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-11-11 19:50:12 +01:00
dependabot[bot]
c85c435b5d Bump werkzeug from 3.0.3 to 3.0.6 in /utils/chaos_ai/docker (#945)
Bumps [werkzeug](https://github.com/pallets/werkzeug) from 3.0.3 to 3.0.6.
- [Release notes](https://github.com/pallets/werkzeug/releases)
- [Changelog](https://github.com/pallets/werkzeug/blob/main/CHANGES.rst)
- [Commits](https://github.com/pallets/werkzeug/compare/3.0.3...3.0.6)

---
updated-dependencies:
- dependency-name: werkzeug
  dependency-version: 3.0.6
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-11-11 19:48:47 +01:00
Paige Patton
d5284ace25 adding prometheus url to krknctl input (#943)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-11 13:45:27 -05:00
Paige Patton
c3098ec80b turning off es in ci tests (#944)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-11-11 12:51:10 -05:00
49 changed files with 3580 additions and 162 deletions

4
.coveragerc Normal file
View File

@@ -0,0 +1,4 @@
[run]
omit =
tests/*
krkn/tests/**

View File

@@ -47,6 +47,17 @@ jobs:
kubectl --namespace default port-forward $es_pod_name 9200 &
prom_name=$(kubectl get pods -n monitoring -l "app.kubernetes.io/name=prometheus" -o name)
kubectl --namespace monitoring port-forward $prom_name 9090 &
# Wait for Elasticsearch to be ready
echo "Waiting for Elasticsearch to be ready..."
for i in {1..30}; do
if curl -k -s -u elastic:$ELASTIC_PASSWORD https://localhost:9200/_cluster/health > /dev/null 2>&1; then
echo "Elasticsearch is ready!"
break
fi
echo "Attempt $i: Elasticsearch not ready yet, waiting..."
sleep 2
done
kubectl apply -f CI/templates/outage_pod.yaml
kubectl wait --for=condition=ready pod -l scenario=outage --timeout=300s
kubectl apply -f CI/templates/container_scenario_pod.yaml
@@ -73,7 +84,7 @@ jobs:
yq -i '.kraken.performance_monitoring="localhost:9090"' CI/config/common_test_config.yaml
yq -i '.elastic.elastic_port=9200' CI/config/common_test_config.yaml
yq -i '.elastic.elastic_url="https://localhost"' CI/config/common_test_config.yaml
yq -i '.elastic.enable_elastic=True' CI/config/common_test_config.yaml
yq -i '.elastic.enable_elastic=False' CI/config/common_test_config.yaml
yq -i '.elastic.password="${{env.ELASTIC_PASSWORD}}"' CI/config/common_test_config.yaml
yq -i '.performance_monitoring.prometheus_url="http://localhost:9090"' CI/config/common_test_config.yaml
echo "test_service_hijacking" > ./CI/tests/functional_tests
@@ -89,7 +100,6 @@ jobs:
echo "test_io_hog" >> ./CI/tests/functional_tests
echo "test_pod_network_filter" >> ./CI/tests/functional_tests
# Push on main only steps + all other functional to collect coverage
# for the badge
- name: Configure AWS Credentials
@@ -105,7 +115,7 @@ jobs:
yq -i '.kraken.port="8081"' CI/config/common_test_config.yaml
yq -i '.kraken.signal_address="0.0.0.0"' CI/config/common_test_config.yaml
yq -i '.kraken.performance_monitoring="localhost:9090"' CI/config/common_test_config.yaml
yq -i '.elastic.enable_elastic=True' CI/config/common_test_config.yaml
yq -i '.elastic.enable_elastic=False' CI/config/common_test_config.yaml
yq -i '.elastic.password="${{env.ELASTIC_PASSWORD}}"' CI/config/common_test_config.yaml
yq -i '.elastic.elastic_port=9200' CI/config/common_test_config.yaml
yq -i '.elastic.elastic_url="https://localhost"' CI/config/common_test_config.yaml
@@ -135,38 +145,38 @@ jobs:
cat ./CI/results.markdown >> $GITHUB_STEP_SUMMARY
echo >> $GITHUB_STEP_SUMMARY
- name: Upload CI logs
if: ${{ success() || failure() }}
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: ci-logs
path: CI/out
if-no-files-found: error
- name: Collect coverage report
if: ${{ success() || failure() }}
if: ${{ always() }}
run: |
python -m coverage html
python -m coverage json
- name: Publish coverage report to job summary
if: ${{ success() || failure() }}
if: ${{ always() }}
run: |
pip install html2text
html2text --ignore-images --ignore-links -b 0 htmlcov/index.html >> $GITHUB_STEP_SUMMARY
- name: Upload coverage data
if: ${{ success() || failure() }}
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: coverage
path: htmlcov
if-no-files-found: error
- name: Upload json coverage
if: ${{ success() || failure() }}
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: coverage.json
path: coverage.json
if-no-files-found: error
- name: Check CI results
if: ${{ success() || failure() }}
if: ${{ always() }}
run: "! grep Fail CI/results.markdown"
badge:

View File

@@ -51,7 +51,7 @@ spec:
claimName: kraken-test-pvc
containers:
- name: kraken-test-container
image: 'quay.io/centos7/httpd-24-centos7:latest'
image: 'quay.io/centos7/httpd-24-centos7:centos7'
volumeMounts:
- mountPath: "/home/krake-dir/"
name: kraken-test-pv

View File

@@ -19,6 +19,7 @@ function functional_test_app_outage {
kubectl get pods
envsubst < CI/config/common_test_config.yaml > CI/config/app_outage.yaml
cat $scenario_file
cat CI/config/app_outage.yaml
python3 -m coverage run -a run_kraken.py -c CI/config/app_outage.yaml
echo "App outage scenario test: Success"
}

View File

@@ -10,7 +10,7 @@ function functional_test_pod_crash {
export scenario_file="scenarios/kind/pod_etcd.yml"
export post_config=""
envsubst < CI/config/common_test_config.yaml > CI/config/pod_config.yaml
cat CI/config/pod_config.yaml
python3 -m coverage run -a run_kraken.py -c CI/config/pod_config.yaml
echo "Pod disruption scenario test: Success"
}

270
INCUBATION_CHECKLIST.md Normal file
View File

@@ -0,0 +1,270 @@
# Review Project Moving Level Evaluation
[x] I have reviewed the TOC's [moving level readiness triage guide](https://github.com/cncf/toc/blob/main/operations/dd-toc-guide.md#initial-triageevaluation-prior-to-assignment), ensured the criteria for my project are met before opening this issue, and understand that unmet criteria will result in the project's application being closed.
# Krkn Incubation Application
v1.6
This template provides the project with a framework to inform the TOC of their conformance to the Incubation Level Criteria.
Project Repo(s): https://github.com/krkn-chaos/krkn
Project Site: https://www.krkn-chaos.dev
Sub-Projects:
- https://github.com/krkn-chaos/krknctl
- https://github.com/krkn-chaos/krkn-hub
Communication: [Slack](https://kubernetes.slack.com/archives/C05SFMHRWK1)
Project points of contacts:
- [Naga Ravi Elluri](mailto:nelluri@redhat.com)
- [Paige Patton](mailto:ppatton@redhat.com)
- [Tullio Sebastiani](mailto:tsebasti@redhat.com)
- [ ] (Post Incubation only) [Book a meeting with CNCF staff](http://project-meetings.cncf.io) to understand project benefits and event resources.
## Incubation Criteria Summary for Krkn
### Application Level Assertion
- [x] This project is currently Sandbox, accepted on 2023/12/19, and applying to Incubation.
- [x] This project is applying to join the CNCF at the Incubation level.
### Adoption Assertion
_The project has been adopted by the following organizations in a testing and integration or production capacity:_
| Organization | Since | Website | Use-Case |
|:-|:-|:-|:-|
| MarketAxess | 2024 | https://www.marketaxess.com/ | Kraken enables us to achieve our goal of increasing the reliability of our cloud products on Kubernetes. The tool allows us to automatically run various chaos scenarios, identify resilience and performance bottlenecks, and seamlessly restore the system to its original state once scenarios finish. These chaos scenarios include pod disruptions, node (EC2) outages, simulating availability zone (AZ) outages, and filling up storage spaces like EBS and EFS. The community is highly responsive to requests and works on expanding the tool's capabilities. MarketAxess actively contributes to the project, adding features such as the ability to leverage existing network ACLs and proposing several feature improvements to enhance test coverage. |
| Red Hat Openshift | 2020 | https://www.redhat.com/ | Kraken is a highly reliable chaos testing tool used to ensure the quality and resiliency of Red Hat Openshift. The engineering team runs all the test scenarios under Kraken on different cloud platforms on both self-managed and cloud services environments prior to the release of a new version of the product. The team also contributes to the Kraken project consistently which helps the test scenarios to keep up with the new features introduced to the product. Inclusion of this test coverage has contributed to gaining the trust of new and existing customers of the product. |
| IBM | 2023 | https://www.ibm.com/ | While working on AI for Chaos Testing at IBM Research, we closely collaborated with the Kraken (Krkn) team to advance intelligent chaos engineering. Our contributions included developing AI-enabled chaos injection strategies and integrating reinforcement learning (RL)-based fault search techniques into the Krkn tool, enabling it to identify and explore system vulnerabilities more efficiently. Kraken stands out as one of the most user-friendly and effective tools for chaos engineering, and the Kraken teams deep technical involvement played a crucial role in the success of this collaboration—helping bridge cutting-edge AI research with practical, real-world system reliability testing. |
## Application Process Principles
### Suggested
N/A
### Required
- [ ] **Engage with the domain specific TAG(s) to increase awareness through a presentation or completing a General Technical Review.**
- This was completed and occurred on DD-MMM-YYYY, and can be discovered at $LINK.
<!-- (Project assertion goes here) -->
- [x] **All project metadata and resources are [vendor-neutral](https://contribute.cncf.io/maintainers/community/vendor-neutrality/).**
<!-- (Project assertion goes here) -->
- [ ] **Review and acknowledgement of expectations for [Sandbox](https://sandbox.cncf.io) projects and requirements for moving forward through the CNCF Maturity levels.**
- Met during Project's application on DD-MMM-YYYY.
<!-- (Project assertion goes here) -->
- [ ] **Due Diligence Review.**
Completion of this due diligence document, resolution of concerns raised, and presented for public comment satisfies the Due Diligence Review criteria.
- [ ] **Additional documentation as appropriate for project type, e.g.: installation documentation, end user documentation, reference implementation and/or code samples.**
<!-- (Project assertion goes here) -->
## Governance and Maintainers
Note: this section may be augmented by the completion of a Governance Review from the Project Reviews subproject.
### Suggested
- [ ] **Governance has continuously been iterated upon by the project as a result of their experience applying it, with the governance history demonstrating evolution of maturity alongside the project's maturity evolution.**
<!-- (Project assertion goes here) -->
- [ ] **Clear and discoverable project governance documentation.**
<!-- (Project assertion goes here) -->
- [ ] **Governance is up to date with actual project activities, including any meetings, elections, leadership, or approval processes.**
<!-- (Project assertion goes here) -->
- [ ] **Governance clearly documents [vendor-neutrality](https://contribute.cncf.io/maintainers/community/vendor-neutrality/) of project direction.**
<!-- (Project assertion goes here) -->
- [ ] **Document how the project makes decisions on leadership, contribution acceptance, requests to the CNCF, and changes to governance or project goals.**
<!-- (Project assertion goes here) -->
- [ ] **Document how role, function-based members, or sub-teams are assigned, onboarded, and removed for specific teams (example: Security Response Committee).**
<!-- (Project assertion goes here) -->
- [ ] **Document a complete maintainer lifecycle process (including roles, onboarding, offboarding, and emeritus status).**
<!-- (Project assertion goes here) -->
- [ ] **Demonstrate usage of the maintainer lifecycle with outcomes, either through the addition or replacement of maintainers as project events have required.**
<!-- (Project assertion goes here) -->
- [ ] **If the project has subprojects: subproject leadership, contribution, maturity status documented, including add/remove process.**
<!-- (Project assertion goes here) -->
### Required
- [x] **Document complete list of current maintainers, including names, contact information, domain of responsibility, and affiliation.**
<!-- (Project assertion goes here) -->
- [x] **A number of active maintainers which is appropriate to the size and scope of the project.**
<!-- (Project assertion goes here) -->
- [x] **Code and Doc ownership in Github and elsewhere matches documented governance roles.**
<!-- (Project assertion goes here) -->
- [x] **Document adoption and adherence to the CNCF Code of Conduct or the project's CoC which is based off the CNCF CoC and not in conflict with it.**
<!-- (Project assertion goes here) -->
- [x] **CNCF Code of Conduct is cross-linked from other governance documents.**
<!-- (Project assertion goes here) -->
- [x] **All subprojects, if any, are listed.**
<!-- (Project assertion goes here) -->
## Contributors and Community
Note: this section may be augmented by the completion of a Governance Review from the Project Reviews subproject.
### Suggested
- [ ] **Contributor ladder with multiple roles for contributors.**
<!-- (Project assertion goes here) -->
### Required
- [x] **Clearly defined and discoverable process to submit issues or changes.**
<!-- (Project assertion goes here) -->
- [x] **Project must have, and document, at least one public communications channel for users and/or contributors.**
<!-- (Project assertion goes here) -->
- [ ] **List and document all project communication channels, including subprojects (mail list/slack/etc.). List any non-public communications channels and what their special purpose is.**
<!-- (Project assertion goes here) -->
- [x] **Up-to-date public meeting schedulers and/or integration with CNCF calendar.**
<!-- (Project assertion goes here) -->
- [ ] **Documentation of how to contribute, with increasing detail as the project matures.**
<!-- (Project assertion goes here) -->
- [x] **Demonstrate contributor activity and recruitment.**
<!-- (Project assertion goes here) -->
## Engineering Principles
### Suggested
- [ ] **Roadmap change process is documented.**
<!-- (Project assertion goes here) -->
- [ ] **History of regular, quality releases.**
<!-- (Project assertion goes here) -->
### Required
- [ ] **Document project goals and objectives that illustrate the projects differentiation in the Cloud Native landscape as well as outlines how this project fulfills an outstanding need and/or solves a problem differently. _This can also be satisfied by completing a General Technical Review._**
- _If applicable_ a General Technical Review was completed/updated on DD-MMM-YYYY, and can be discovered at $LINK.
<!-- (Project assertion goes here) -->
- [ ] **Document what the project does, and why it does it - including viable cloud native use cases. This can also be satisfied by completing a General Technical Review.**
<!-- (Project assertion goes here) -->
- [ ] **Document and maintain a public roadmap or other forward looking planning document or tracking mechanism.**
<!-- (Project assertion goes here) -->
- [ ] **Document overview of project architecture and software design that demonstrates viable cloud native use cases, as part of the project's documentation. _This can also be satisfied by completing a General Technical Review and capturing the output in the project's documentation._**
- _If applicable_ a General Technical Review was completed/updated on DD-MMM-YYYY, and can be discovered at $LINK.
<!-- (Project assertion goes here) -->
- [x] **Document the project's release process.**
<!-- (Project assertion goes here) -->
## Security
### Suggested
N/A
### Required
Note: this section may be augmented by a joint-assessment performed by TAG Security and Compliance.
- [x] **Clearly defined and discoverable process to report security issues.**
<!-- (Project assertion goes here) -->
- [x] **Enforcing Access Control Rules to secure the code base against attacks (Example: two factor authentication enforcement, and/or use of ACL tools.)**
<!-- (Project assertion goes here) -->
- [x] **Document assignment of security response roles and how reports are handled.**
<!-- (Project assertion goes here) -->
- [x] **Document [Security Self-Assessment](https://tag-security.cncf.io/community/assessments/guide/self-assessment/).**
<!-- (Project assertion goes here) -->
- [ ] **Achieve the Open Source Security Foundation (OpenSSF) Best Practices passing badge.**
<!-- (Project assertion goes here) -->
## Ecosystem
### Suggested
N/A
### Required
- [ ] **Publicly documented list of adopters, which may indicate their adoption level (dev/trialing, prod, etc.)**
<!-- (Project assertion goes here) -->
- [ ] **Used in appropriate capacity by at least 3 independent + indirect/direct adopters, (these are not required to be in the publicly documented list of adopters)**
<!-- (Project assertion goes here) -->
The project provided the TOC with a list of adopters for verification of use of the project at the level expected, i.e. production use for graduation, dev/test for incubation.
- [ ] **TOC verification of adopters.**
<!-- (Project assertion goes here) -->
Refer to the Adoption portion of this document.
- [ ] **Clearly documented integrations and/or compatibility with other CNCF projects as well as non-CNCF projects.**
<!-- (Project assertion goes here) -->
## Additional Information
<!-- Provide any additional information you feel is relevant for the TOC in conducting due diligence on this project. -->

View File

@@ -126,4 +126,6 @@ kubevirt_checks: # Utilizing virt che
name: # Regex Name style of VMI's to watch, optional, will watch all VMI names in the namespace if left blank
only_failures: False # Boolean of whether to show all VMI's failures and successful ssh connection (False), or only failure status' (True)
disconnected: False # Boolean of how to try to connect to the VMIs; if True will use the ip_address to try ssh from within a node, if false will use the name and uses virtctl to try to connect; Default is False
ssh_node: "" # If set, will be a backup way to ssh to a node. Will want to set to a node that isn't targeted in chaos
ssh_node: "" # If set, will be a backup way to ssh to a node. Will want to set to a node that isn't targeted in chaos
node_names: ""
exit_on_failure: # If value is True and VMI's are failing post chaos returns failure, values can be True/False

View File

@@ -1,22 +1,35 @@
# oc build
FROM golang:1.23.1 AS oc-build
FROM golang:1.24.9 AS oc-build
RUN apt-get update && apt-get install -y --no-install-recommends libkrb5-dev
WORKDIR /tmp
# oc build
RUN git clone --branch release-4.18 https://github.com/openshift/oc.git
WORKDIR /tmp/oc
RUN go mod edit -go 1.23.1 &&\
go get github.com/moby/buildkit@v0.12.5 &&\
go get github.com/containerd/containerd@v1.7.11&&\
go get github.com/docker/docker@v25.0.6&&\
go get github.com/opencontainers/runc@v1.1.14&&\
go get github.com/go-git/go-git/v5@v5.13.0&&\
go get golang.org/x/net@v0.38.0&&\
go get github.com/containerd/containerd@v1.7.27&&\
go get golang.org/x/oauth2@v0.27.0&&\
go get golang.org/x/crypto@v0.35.0&&\
RUN go mod edit -go 1.24.9 &&\
go mod edit -require github.com/moby/buildkit@v0.12.5 &&\
go mod edit -require github.com/containerd/containerd@v1.7.29&&\
go mod edit -require github.com/docker/docker@v27.5.1+incompatible&&\
go mod edit -require github.com/opencontainers/runc@v1.2.8&&\
go mod edit -require github.com/go-git/go-git/v5@v5.13.0&&\
go mod edit -require github.com/opencontainers/selinux@v1.13.0&&\
go mod edit -require github.com/ulikunitz/xz@v0.5.15&&\
go mod edit -require golang.org/x/net@v0.38.0&&\
go mod edit -require github.com/containerd/containerd@v1.7.27&&\
go mod edit -require golang.org/x/oauth2@v0.27.0&&\
go mod edit -require golang.org/x/crypto@v0.35.0&&\
go mod edit -replace github.com/containerd/containerd@v1.7.27=github.com/containerd/containerd@v1.7.29&&\
go mod tidy && go mod vendor
RUN make GO_REQUIRED_MIN_VERSION:= oc
# virtctl build
WORKDIR /tmp
RUN git clone https://github.com/kubevirt/kubevirt.git
WORKDIR /tmp/kubevirt
RUN go mod edit -go 1.24.9 &&\
go work use &&\
go build -o virtctl ./cmd/virtctl/
FROM fedora:40
ARG PR_NUMBER
ARG TAG
@@ -31,13 +44,9 @@ RUN dnf update && dnf install -y --setopt=install_weak_deps=False \
git python39 jq yq gettext wget which ipmitool openssh-server &&\
dnf clean all
# Virtctl
RUN export VERSION=$(curl https://storage.googleapis.com/kubevirt-prow/release/kubevirt/kubevirt/stable.txt) && \
wget https://github.com/kubevirt/kubevirt/releases/download/${VERSION}/virtctl-${VERSION}-linux-amd64 && \
chmod +x virtctl-${VERSION}-linux-amd64 && sudo mv virtctl-${VERSION}-linux-amd64 /usr/local/bin/virtctl
# copy oc client binary from oc-build image
COPY --from=oc-build /tmp/oc/oc /usr/bin/oc
COPY --from=oc-build /tmp/kubevirt/virtctl /usr/bin/virtctl
# krkn build
RUN git clone https://github.com/krkn-chaos/krkn.git /home/krkn/kraken && \
@@ -56,6 +65,11 @@ RUN if [ -n "$TAG" ]; then git checkout "$TAG";fi
RUN python3.9 -m ensurepip --upgrade --default-pip
RUN python3.9 -m pip install --upgrade pip setuptools==78.1.1
# removes the the vulnerable versions of setuptools and pip
RUN rm -rf "$(pip cache dir)"
RUN rm -rf /tmp/*
RUN rm -rf /usr/local/lib/python3.9/ensurepip/_bundled
RUN pip3.9 install -r requirements.txt
RUN pip3.9 install jsonschema

View File

@@ -85,6 +85,24 @@
"default": "False",
"required": "false"
},
{
"name": "prometheus-url",
"short_description": "Prometheus url",
"description": "Prometheus url for when running on kuberenetes",
"variable": "PROMETHEUS_URL",
"type": "string",
"default": "",
"required": "false"
},
{
"name": "prometheus-token",
"short_description": "Prometheus bearer token",
"description": "Prometheus bearer token for prometheus url authentication",
"variable": "PROMETHEUS_TOKEN",
"type": "string",
"default": "",
"required": "false"
},
{
"name": "uuid",
"short_description": "Sets krkn run uuid",
@@ -501,6 +519,26 @@
"default": "",
"required": "false"
},
{
"name": "kubevirt-exit-on-failure",
"short_description": "KubeVirt fail if failed vms at end of run",
"description": "KubeVirt fails run if vms still have false status",
"variable": "KUBE_VIRT_EXIT_ON_FAIL",
"type": "enum",
"allowed_values": "True,False,true,false",
"separator": ",",
"default": "False",
"required": "false"
},
{
"name": "kubevirt-node-node",
"short_description": "KubeVirt node to filter vms on",
"description": "Only track VMs in KubeVirt on given node name",
"variable": "KUBE_VIRT_NODE_NAME",
"type": "string",
"default": "",
"required": "false"
},
{
"name": "krkn-debug",
"short_description": "Krkn debug mode",

View File

@@ -3,7 +3,7 @@ import logging
from typing import Optional, TYPE_CHECKING
from krkn.rollback.config import RollbackConfig
from krkn.rollback.handler import execute_rollback_version_files, cleanup_rollback_version_files
from krkn.rollback.handler import execute_rollback_version_files
@@ -96,24 +96,16 @@ def execute_rollback(telemetry_ocp: "KrknTelemetryOpenshift", run_uuid: Optional
:return: Exit code (0 for success, 1 for error)
"""
logging.info("Executing rollback version files")
if not run_uuid:
logging.error("run_uuid is required for execute-rollback command")
return 1
if not scenario_type:
logging.warning("scenario_type is not specified, executing all scenarios in rollback directory")
logging.info(f"Executing rollback for run_uuid={run_uuid or '*'}, scenario_type={scenario_type or '*'}")
try:
# Execute rollback version files
logging.info(f"Executing rollback for run_uuid={run_uuid}, scenario_type={scenario_type or '*'}")
execute_rollback_version_files(telemetry_ocp, run_uuid, scenario_type)
# If execution was successful, cleanup the version files
logging.info("Rollback execution completed successfully, cleaning up version files")
cleanup_rollback_version_files(run_uuid, scenario_type)
logging.info("Rollback execution and cleanup completed successfully")
execute_rollback_version_files(
telemetry_ocp,
run_uuid,
scenario_type,
ignore_auto_rollback_config=True
)
return 0
except Exception as e:

View File

@@ -108,7 +108,76 @@ class RollbackConfig(metaclass=SingletonMeta):
return f"{cls().versions_directory}/{rollback_context}"
@classmethod
def search_rollback_version_files(cls, run_uuid: str, scenario_type: str | None = None) -> list[str]:
def is_rollback_version_file_format(cls, file_name: str, expected_scenario_type: str | None = None) -> bool:
"""
Validate the format of a rollback version file name.
Expected format: <scenario_type>_<timestamp>_<hash_suffix>.py
where:
- scenario_type: string (can include underscores)
- timestamp: integer (nanoseconds since epoch)
- hash_suffix: alphanumeric string (length 8)
- .py: file extension
:param file_name: The name of the file to validate.
:param expected_scenario_type: The expected scenario type (if any) to validate against.
:return: True if the file name matches the expected format, False otherwise.
"""
if not file_name.endswith(".py"):
return False
parts = file_name.split("_")
if len(parts) < 3:
return False
scenario_type = "_".join(parts[:-2])
timestamp_str = parts[-2]
hash_suffix_with_ext = parts[-1]
hash_suffix = hash_suffix_with_ext[:-3]
if expected_scenario_type and scenario_type != expected_scenario_type:
return False
if not timestamp_str.isdigit():
return False
if len(hash_suffix) != 8 or not hash_suffix.isalnum():
return False
return True
@classmethod
def is_rollback_context_directory_format(cls, directory_name: str, expected_run_uuid: str | None = None) -> bool:
"""
Validate the format of a rollback context directory name.
Expected format: <timestamp>-<run_uuid>
where:
- timestamp: integer (nanoseconds since epoch)
- run_uuid: alphanumeric string
:param directory_name: The name of the directory to validate.
:param expected_run_uuid: The expected run UUID (if any) to validate against.
:return: True if the directory name matches the expected format, False otherwise.
"""
parts = directory_name.split("-", 1)
if len(parts) != 2:
return False
timestamp_str, run_uuid = parts
# Validate timestamp is numeric
if not timestamp_str.isdigit():
return False
# Validate run_uuid
if expected_run_uuid and expected_run_uuid != run_uuid:
return False
return True
@classmethod
def search_rollback_version_files(cls, run_uuid: str | None = None, scenario_type: str | None = None) -> list[str]:
"""
Search for rollback version files based on run_uuid and scenario_type.
@@ -123,34 +192,35 @@ class RollbackConfig(metaclass=SingletonMeta):
if not os.path.exists(cls().versions_directory):
return []
rollback_context_directories = [
dirname for dirname in os.listdir(cls().versions_directory) if run_uuid in dirname
]
rollback_context_directories = []
for dir in os.listdir(cls().versions_directory):
if cls.is_rollback_context_directory_format(dir, run_uuid):
rollback_context_directories.append(dir)
else:
logger.warning(f"Directory {dir} does not match expected pattern of <timestamp>-<run_uuid>")
if not rollback_context_directories:
logger.warning(f"No rollback context directories found for run UUID {run_uuid}")
return []
if len(rollback_context_directories) > 1:
logger.warning(
f"Expected one directory for run UUID {run_uuid}, found: {rollback_context_directories}"
)
rollback_context_directory = rollback_context_directories[0]
version_files = []
scenario_rollback_versions_directory = os.path.join(
cls().versions_directory, rollback_context_directory
)
for file in os.listdir(scenario_rollback_versions_directory):
# assert all files start with scenario_type and end with .py
if file.endswith(".py") and (scenario_type is None or file.startswith(scenario_type)):
version_files.append(
os.path.join(scenario_rollback_versions_directory, file)
)
else:
logger.warning(
f"File {file} does not match expected pattern for scenario type {scenario_type}"
)
for rollback_context_dir in rollback_context_directories:
rollback_context_dir = os.path.join(cls().versions_directory, rollback_context_dir)
for file in os.listdir(rollback_context_dir):
# Skip known non-rollback files/directories
if file == "__pycache__" or file.endswith(".executed"):
continue
if cls.is_rollback_version_file_format(file, scenario_type):
version_files.append(
os.path.join(rollback_context_dir, file)
)
else:
logger.warning(
f"File {file} does not match expected pattern of <{scenario_type or '*'}>_<timestamp>_<hash_suffix>.py"
)
return version_files
@dataclass(frozen=True)

View File

@@ -117,23 +117,32 @@ def _parse_rollback_module(version_file_path: str) -> tuple[RollbackCallable, Ro
return rollback_callable, rollback_content
def execute_rollback_version_files(telemetry_ocp: "KrknTelemetryOpenshift", run_uuid: str, scenario_type: str | None = None):
def execute_rollback_version_files(
telemetry_ocp: "KrknTelemetryOpenshift",
run_uuid: str | None = None,
scenario_type: str | None = None,
ignore_auto_rollback_config: bool = False
):
"""
Execute rollback version files for the given run_uuid and scenario_type.
This function is called when a signal is received to perform rollback operations.
:param run_uuid: Unique identifier for the run.
:param scenario_type: Type of the scenario being rolled back.
:param ignore_auto_rollback_config: Flag to ignore auto rollback configuration. Will be set to True for manual execute-rollback calls.
"""
if not ignore_auto_rollback_config and RollbackConfig().auto is False:
logger.warning(f"Auto rollback is disabled, skipping execution for run_uuid={run_uuid or '*'}, scenario_type={scenario_type or '*'}")
return
# Get the rollback versions directory
version_files = RollbackConfig.search_rollback_version_files(run_uuid, scenario_type)
if not version_files:
logger.warning(f"Skip execution for run_uuid={run_uuid}, scenario_type={scenario_type or '*'}")
logger.warning(f"Skip execution for run_uuid={run_uuid or '*'}, scenario_type={scenario_type or '*'}")
return
# Execute all version files in the directory
logger.info(f"Executing rollback version files for run_uuid={run_uuid}, scenario_type={scenario_type or '*'}")
logger.info(f"Executing rollback version files for run_uuid={run_uuid or '*'}, scenario_type={scenario_type or '*'}")
for version_file in version_files:
try:
logger.info(f"Executing rollback version file: {version_file}")
@@ -144,28 +153,37 @@ def execute_rollback_version_files(telemetry_ocp: "KrknTelemetryOpenshift", run_
logger.info('Executing rollback callable...')
rollback_callable(rollback_content, telemetry_ocp)
logger.info('Rollback completed.')
logger.info(f"Executed {version_file} successfully.")
success = True
except Exception as e:
success = False
logger.error(f"Failed to execute rollback version file {version_file}: {e}")
raise
# Rename the version file with .executed suffix if successful
if success:
try:
executed_file = f"{version_file}.executed"
os.rename(version_file, executed_file)
logger.info(f"Renamed {version_file} to {executed_file} successfully.")
except Exception as e:
logger.error(f"Failed to rename rollback version file {version_file}: {e}")
raise
def cleanup_rollback_version_files(run_uuid: str, scenario_type: str):
"""
Cleanup rollback version files for the given run_uuid and scenario_type.
This function is called to remove the rollback version files after execution.
This function is called to remove the rollback version files after successful scenario execution in run_scenarios.
:param run_uuid: Unique identifier for the run.
:param scenario_type: Type of the scenario being rolled back.
"""
# Get the rollback versions directory
version_files = RollbackConfig.search_rollback_version_files(run_uuid, scenario_type)
if not version_files:
logger.warning(f"Skip cleanup for run_uuid={run_uuid}, scenario_type={scenario_type or '*'}")
return
# Remove all version files in the directory
logger.info(f"Cleaning up rollback version files for run_uuid={run_uuid}, scenario_type={scenario_type}")
for version_file in version_files:
@@ -176,7 +194,6 @@ def cleanup_rollback_version_files(run_uuid: str, scenario_type: str):
logger.error(f"Failed to remove rollback version file {version_file}: {e}")
raise
class RollbackHandler:
def __init__(
self,

View File

@@ -115,14 +115,15 @@ class AbstractScenarioPlugin(ABC):
)
return_value = 1
# execute rollback files based on the return value
if return_value != 0:
if return_value == 0:
cleanup_rollback_version_files(
run_uuid, scenario_telemetry.scenario_type
)
else:
# execute rollback files based on the return value
execute_rollback_version_files(
telemetry, run_uuid, scenario_telemetry.scenario_type
)
cleanup_rollback_version_files(
run_uuid, scenario_telemetry.scenario_type
)
scenario_telemetry.exit_status = return_value
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(

View File

@@ -34,6 +34,21 @@ class ApplicationOutageScenarioPlugin(AbstractScenarioPlugin):
)
namespace = get_yaml_item_value(scenario_config, "namespace", "")
duration = get_yaml_item_value(scenario_config, "duration", 60)
exclude_label = get_yaml_item_value(
scenario_config, "exclude_label", None
)
match_expressions = self._build_exclude_expressions(exclude_label)
if match_expressions:
# Log the format being used for better clarity
format_type = "dict" if isinstance(exclude_label, dict) else "string"
logging.info(
"Excluding pods with labels (%s format): %s",
format_type,
", ".join(
f"{expr['key']} NOT IN {expr['values']}"
for expr in match_expressions
),
)
start_time = int(time.time())
policy_name = f"krkn-deny-{get_random_string(5)}"
@@ -43,18 +58,30 @@ class ApplicationOutageScenarioPlugin(AbstractScenarioPlugin):
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: """
+ policy_name
+ """
name: {{ policy_name }}
spec:
podSelector:
matchLabels: {{ pod_selector }}
{% if match_expressions %}
matchExpressions:
{% for expression in match_expressions %}
- key: {{ expression["key"] }}
operator: NotIn
values:
{% for value in expression["values"] %}
- {{ value }}
{% endfor %}
{% endfor %}
{% endif %}
policyTypes: {{ traffic_type }}
"""
)
t = Template(network_policy_template)
rendered_spec = t.render(
pod_selector=pod_selector, traffic_type=traffic_type
pod_selector=pod_selector,
traffic_type=traffic_type,
match_expressions=match_expressions,
policy_name=policy_name,
)
yaml_spec = yaml.safe_load(rendered_spec)
# Block the traffic by creating network policy
@@ -122,3 +149,63 @@ class ApplicationOutageScenarioPlugin(AbstractScenarioPlugin):
def get_scenario_types(self) -> list[str]:
return ["application_outages_scenarios"]
@staticmethod
def _build_exclude_expressions(exclude_label) -> list[dict]:
"""
Build match expressions for NetworkPolicy from exclude_label.
Supports multiple formats:
- Dict format (preferred, similar to pod_selector): {key1: value1, key2: [value2, value3]}
Example: {tier: "gold", env: ["prod", "staging"]}
- String format: "key1=value1,key2=value2" or "key1=value1|value2"
Example: "tier=gold,env=prod" or "tier=gold|platinum"
- List format (list of strings): ["key1=value1", "key2=value2"]
Example: ["tier=gold", "env=prod"]
Note: List elements must be strings in "key=value" format.
:param exclude_label: Can be dict, string, list of strings, or None
:return: List of match expression dictionaries
"""
expressions: list[dict] = []
if not exclude_label:
return expressions
def _append_expr(key: str, values):
if not key or values is None:
return
if not isinstance(values, list):
values = [values]
cleaned_values = [str(v).strip() for v in values if str(v).strip()]
if cleaned_values:
expressions.append({"key": key.strip(), "values": cleaned_values})
if isinstance(exclude_label, dict):
for k, v in exclude_label.items():
_append_expr(str(k), v)
return expressions
if isinstance(exclude_label, list):
selectors = exclude_label
else:
selectors = [sel.strip() for sel in str(exclude_label).split(",")]
for selector in selectors:
if not selector:
continue
if "=" not in selector:
logging.warning(
"exclude_label entry '%s' is invalid, expected key=value format",
selector,
)
continue
key, value = selector.split("=", 1)
value_items = (
[item.strip() for item in value.split("|") if item.strip()]
if value
else []
)
_append_expr(key, value_items or value)
return expressions

View File

@@ -70,6 +70,7 @@ class ContainerScenarioPlugin(AbstractScenarioPlugin):
container_name = get_yaml_item_value(cont_scenario, "container_name", "")
kill_action = get_yaml_item_value(cont_scenario, "action", 1)
kill_count = get_yaml_item_value(cont_scenario, "count", 1)
exclude_label = get_yaml_item_value(cont_scenario, "exclude_label", "")
if not isinstance(kill_action, int):
logging.error(
"Please make sure the action parameter defined in the "
@@ -91,7 +92,19 @@ class ContainerScenarioPlugin(AbstractScenarioPlugin):
pods = kubecli.get_all_pods(label_selector)
else:
# Only returns pod names
pods = kubecli.list_pods(namespace, label_selector)
# Use list_pods with exclude_label parameter to exclude pods
if exclude_label:
logging.info(
"Using exclude_label '%s' to exclude pods from container scenario %s in namespace %s",
exclude_label,
scenario_name,
namespace,
)
pods = kubecli.list_pods(
namespace=namespace,
label_selector=label_selector,
exclude_label=exclude_label if exclude_label else None
)
else:
if namespace == "*":
logging.error(
@@ -102,6 +115,7 @@ class ContainerScenarioPlugin(AbstractScenarioPlugin):
# sys.exit(1)
raise RuntimeError()
pods = pod_names
# get container and pod name
container_pod_list = []
for pod in pods:

View File

@@ -25,6 +25,7 @@ class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
super().__init__(scenario_type)
self.k8s_client = None
self.original_vmi = None
self.vmis_list = []
# Scenario type is handled directly in execute_scenario
def get_scenario_types(self) -> list[str]:
@@ -106,20 +107,20 @@ class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
:return: The VMI object if found, None otherwise
"""
try:
vmis = self.custom_object_client.list_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
)
namespaces = self.k8s_client.list_namespaces_by_regex(namespace)
for namespace in namespaces:
vmis = self.custom_object_client.list_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
)
vmi_list = []
for vmi in vmis.get("items"):
vmi_name = vmi.get("metadata",{}).get("name")
match = re.match(regex_name, vmi_name)
if match:
vmi_list.append(vmi)
return vmi_list
for vmi in vmis.get("items"):
vmi_name = vmi.get("metadata",{}).get("name")
match = re.match(regex_name, vmi_name)
if match:
self.vmis_list.append(vmi)
except ApiException as e:
if e.status == 404:
logging.warning(f"VMI {regex_name} not found in namespace {namespace}")
@@ -152,21 +153,22 @@ class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
logging.error("vm_name parameter is required")
return 1
self.pods_status = PodsStatus()
vmis_list = self.get_vmis(vm_name,namespace)
self.get_vmis(vm_name,namespace)
for _ in range(kill_count):
rand_int = random.randint(0, len(vmis_list) - 1)
vmi = vmis_list[rand_int]
rand_int = random.randint(0, len(self.vmis_list) - 1)
vmi = self.vmis_list[rand_int]
logging.info(f"Starting KubeVirt VM outage scenario for VM: {vm_name} in namespace: {namespace}")
vmi_name = vmi.get("metadata").get("name")
if not self.validate_environment(vmi_name, namespace):
vmi_namespace = vmi.get("metadata").get("namespace")
if not self.validate_environment(vmi_name, vmi_namespace):
return 1
vmi = self.get_vmi(vmi_name, namespace)
vmi = self.get_vmi(vmi_name, vmi_namespace)
self.affected_pod = AffectedPod(
pod_name=vmi_name,
namespace=namespace,
namespace=vmi_namespace,
)
if not vmi:
logging.error(f"VMI {vm_name} not found in namespace {namespace}")
@@ -174,12 +176,12 @@ class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
self.original_vmi = vmi
logging.info(f"Captured initial state of VMI: {vm_name}")
result = self.delete_vmi(vmi_name, namespace, disable_auto_restart)
result = self.delete_vmi(vmi_name, vmi_namespace, disable_auto_restart)
if result != 0:
self.pods_status.unrecovered.append(self.affected_pod)
continue
result = self.wait_for_running(vmi_name,namespace, timeout)
result = self.wait_for_running(vmi_name,vmi_namespace, timeout)
if result != 0:
self.pods_status.unrecovered.append(self.affected_pod)
continue

View File

@@ -1,3 +1,5 @@
import base64
import json
import logging
import random
import re
@@ -11,9 +13,12 @@ from krkn_lib.utils import get_yaml_item_value, log_exception
from krkn import cerberus, utils
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn.rollback.config import RollbackContent
from krkn.rollback.handler import set_rollback_context_decorator
class PvcScenarioPlugin(AbstractScenarioPlugin):
@set_rollback_context_decorator
def run(
self,
run_uuid: str,
@@ -229,6 +234,24 @@ class PvcScenarioPlugin(AbstractScenarioPlugin):
logging.info("\n" + str(response))
if str(file_name).lower() in str(response).lower():
logging.info("%s file successfully created" % (str(full_path)))
# Set rollback callable to ensure temp file cleanup on failure or interruption
rollback_data = {
"pod_name": pod_name,
"container_name": container_name,
"mount_path": mount_path,
"file_name": file_name,
"full_path": full_path,
}
json_str = json.dumps(rollback_data)
encoded_data = base64.b64encode(json_str.encode('utf-8')).decode('utf-8')
self.rollback_handler.set_rollback_callable(
self.rollback_temp_file,
RollbackContent(
namespace=namespace,
resource_identifier=encoded_data,
),
)
else:
logging.error(
"PvcScenarioPlugin Failed to create tmp file with %s size"
@@ -313,5 +336,57 @@ class PvcScenarioPlugin(AbstractScenarioPlugin):
res = int(value[:-2]) * (base**exp)
return res
@staticmethod
def rollback_temp_file(
rollback_content: RollbackContent,
lib_telemetry: KrknTelemetryOpenshift,
):
"""Rollback function to remove temporary file created during the PVC scenario.
:param rollback_content: Rollback content containing namespace and encoded rollback data in resource_identifier.
:param lib_telemetry: Instance of KrknTelemetryOpenshift for Kubernetes operations.
"""
try:
namespace = rollback_content.namespace
import base64 # noqa
import json # noqa
decoded_data = base64.b64decode(rollback_content.resource_identifier.encode('utf-8')).decode('utf-8')
rollback_data = json.loads(decoded_data)
pod_name = rollback_data["pod_name"]
container_name = rollback_data["container_name"]
full_path = rollback_data["full_path"]
file_name = rollback_data["file_name"]
mount_path = rollback_data["mount_path"]
logging.info(
f"Rolling back PVC scenario: removing temp file {full_path} from pod {pod_name} in namespace {namespace}"
)
# Remove the temp file
command = "rm -f %s" % (str(full_path))
logging.info("Remove temp file from the PVC command:\n %s" % command)
response = lib_telemetry.get_lib_kubernetes().exec_cmd_in_pod(
[command], pod_name, namespace, container_name
)
logging.info("\n" + str(response))
# Verify removal
command = "ls -lh %s" % (str(mount_path))
logging.info("Check temp file is removed command:\n %s" % command)
response = lib_telemetry.get_lib_kubernetes().exec_cmd_in_pod(
[command], pod_name, namespace, container_name
)
logging.info("\n" + str(response))
if not (str(file_name).lower() in str(response).lower()):
logging.info("Temp file successfully removed during rollback")
else:
logging.warning(
f"Temp file {file_name} may still exist after rollback attempt"
)
logging.info("PVC scenario rollback completed successfully.")
except Exception as e:
logging.error(f"Failed to rollback PVC scenario temp file: {e}")
def get_scenario_types(self) -> list[str]:
return ["pvc_scenarios"]

View File

@@ -1,13 +1,17 @@
import json
import logging
import time
import base64
import yaml
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn_lib.utils import get_yaml_item_value
from krkn.rollback.config import RollbackContent
from krkn.rollback.handler import set_rollback_context_decorator
class ServiceHijackingScenarioPlugin(AbstractScenarioPlugin):
@set_rollback_context_decorator
def run(
self,
run_uuid: str,
@@ -78,6 +82,24 @@ class ServiceHijackingScenarioPlugin(AbstractScenarioPlugin):
logging.info(f"service: {service_name} successfully patched!")
logging.info(f"original service manifest:\n\n{yaml.dump(original_service)}")
# Set rollback callable to ensure service restoration and pod cleanup on failure or interruption
rollback_data = {
"service_name": service_name,
"service_namespace": service_namespace,
"original_selectors": original_service["spec"]["selector"],
"webservice_pod_name": webservice.pod_name,
}
json_str = json.dumps(rollback_data)
encoded_data = base64.b64encode(json_str.encode("utf-8")).decode("utf-8")
self.rollback_handler.set_rollback_callable(
self.rollback_service_hijacking,
RollbackContent(
namespace=service_namespace,
resource_identifier=encoded_data,
),
)
logging.info(f"waiting {chaos_duration} before restoring the service")
time.sleep(chaos_duration)
selectors = [
@@ -106,5 +128,63 @@ class ServiceHijackingScenarioPlugin(AbstractScenarioPlugin):
)
return 1
@staticmethod
def rollback_service_hijacking(
rollback_content: RollbackContent,
lib_telemetry: KrknTelemetryOpenshift,
):
"""Rollback function to restore original service selectors and cleanup hijacker pod.
:param rollback_content: Rollback content containing namespace and encoded rollback data in resource_identifier.
:param lib_telemetry: Instance of KrknTelemetryOpenshift for Kubernetes operations.
"""
try:
namespace = rollback_content.namespace
import json # noqa
import base64 # noqa
# Decode rollback data from resource_identifier
decoded_data = base64.b64decode(rollback_content.resource_identifier.encode("utf-8")).decode("utf-8")
rollback_data = json.loads(decoded_data)
service_name = rollback_data["service_name"]
service_namespace = rollback_data["service_namespace"]
original_selectors = rollback_data["original_selectors"]
webservice_pod_name = rollback_data["webservice_pod_name"]
logging.info(
f"Rolling back service hijacking: restoring service {service_name} in namespace {service_namespace}"
)
# Restore original service selectors
selectors = [
"=".join([key, original_selectors[key]])
for key in original_selectors.keys()
]
logging.info(f"Restoring original service selectors: {selectors}")
restored_service = lib_telemetry.get_lib_kubernetes().replace_service_selector(
selectors, service_name, service_namespace
)
if restored_service is None:
logging.warning(
f"Failed to restore service {service_name} in namespace {service_namespace}"
)
else:
logging.info(f"Successfully restored service {service_name}")
# Delete the hijacker pod
logging.info(f"Deleting hijacker pod: {webservice_pod_name}")
try:
lib_telemetry.get_lib_kubernetes().delete_pod(
webservice_pod_name, service_namespace
)
logging.info(f"Successfully deleted hijacker pod: {webservice_pod_name}")
except Exception as e:
logging.warning(f"Failed to delete hijacker pod {webservice_pod_name}: {e}")
logging.info("Service hijacking rollback completed successfully.")
except Exception as e:
logging.error(f"Failed to rollback service hijacking: {e}")
def get_scenario_types(self) -> list[str]:
return ["service_hijacking_scenarios"]

View File

@@ -1,3 +1,5 @@
import base64
import json
import logging
import os
import time
@@ -7,9 +9,12 @@ from krkn_lib import utils as krkn_lib_utils
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn.rollback.config import RollbackContent
from krkn.rollback.handler import set_rollback_context_decorator
class SynFloodScenarioPlugin(AbstractScenarioPlugin):
@set_rollback_context_decorator
def run(
self,
run_uuid: str,
@@ -50,6 +55,16 @@ class SynFloodScenarioPlugin(AbstractScenarioPlugin):
config["attacker-nodes"],
)
pod_names.append(pod_name)
# Set rollback callable to ensure pod cleanup on failure or interruption
rollback_data = base64.b64encode(json.dumps(pod_names).encode('utf-8')).decode('utf-8')
self.rollback_handler.set_rollback_callable(
self.rollback_syn_flood_pods,
RollbackContent(
namespace=config["namespace"],
resource_identifier=rollback_data,
),
)
logging.info("waiting all the attackers to finish:")
did_finish = False
@@ -137,3 +152,23 @@ class SynFloodScenarioPlugin(AbstractScenarioPlugin):
def get_scenario_types(self) -> list[str]:
return ["syn_flood_scenarios"]
@staticmethod
def rollback_syn_flood_pods(rollback_content: RollbackContent, lib_telemetry: KrknTelemetryOpenshift):
"""
Rollback function to delete syn flood pods.
:param rollback_content: Rollback content containing namespace and resource_identifier.
:param lib_telemetry: Instance of KrknTelemetryOpenshift for Kubernetes operations
"""
try:
namespace = rollback_content.namespace
import base64 # noqa
import json # noqa
pod_names = json.loads(base64.b64decode(rollback_content.resource_identifier.encode('utf-8')).decode('utf-8'))
logging.info(f"Rolling back syn flood pods: {pod_names} in namespace: {namespace}")
for pod_name in pod_names:
lib_telemetry.get_lib_kubernetes().delete_pod(pod_name, namespace)
logging.info("Rollback of syn flood pods completed successfully.")
except Exception as e:
logging.error(f"Failed to rollback syn flood pods: {e}")

View File

@@ -144,6 +144,10 @@ class TimeActionsScenarioPlugin(AbstractScenarioPlugin):
node_names = scenario["object_name"]
elif "label_selector" in scenario.keys() and scenario["label_selector"]:
node_names = kubecli.list_nodes(scenario["label_selector"])
# going to filter out nodes with the exclude_label if it is provided
if "exclude_label" in scenario.keys() and scenario["exclude_label"]:
excluded_nodes = kubecli.list_nodes(scenario["exclude_label"])
node_names = [node for node in node_names if node not in excluded_nodes]
for node in node_names:
self.skew_node(node, scenario["action"], kubecli)
logging.info("Reset date/time on node " + str(node))
@@ -189,6 +193,10 @@ class TimeActionsScenarioPlugin(AbstractScenarioPlugin):
counter += 1
elif "label_selector" in scenario.keys() and scenario["label_selector"]:
pod_names = kubecli.get_all_pods(scenario["label_selector"])
# and here filter out the pods with exclude_label if it is provided
if "exclude_label" in scenario.keys() and scenario["exclude_label"]:
excluded_pods = kubecli.get_all_pods(scenario["exclude_label"])
pod_names = [pod for pod in pod_names if pod not in excluded_pods]
if len(pod_names) == 0:
logging.info(

View File

@@ -1,6 +1,7 @@
import time
import logging
import math
import queue
from datetime import datetime
from krkn_lib.models.telemetry.models import VirtCheck
@@ -20,37 +21,52 @@ class VirtChecker:
self.vm_list = []
self.threads = []
self.threads_limit = threads_limit
if self.namespace == "":
logging.info("kube virt checks config is not defined, skipping them")
return
# setting to 0 in case no variables are set, so no threads later get made
self.batch_size = 0
self.ret_value = 0
vmi_name_match = get_yaml_item_value(kubevirt_check_config, "name", ".*")
self.krkn_lib = krkn_lib
self.disconnected = get_yaml_item_value(kubevirt_check_config, "disconnected", False)
self.only_failures = get_yaml_item_value(kubevirt_check_config, "only_failures", False)
self.interval = get_yaml_item_value(kubevirt_check_config, "interval", 2)
self.ssh_node = get_yaml_item_value(kubevirt_check_config, "ssh_node", "")
self.node_names = get_yaml_item_value(kubevirt_check_config, "node_names", "")
self.exit_on_failure = get_yaml_item_value(kubevirt_check_config, "exit_on_failure", False)
if self.namespace == "":
logging.info("kube virt checks config is not defined, skipping them")
return
try:
self.kube_vm_plugin = KubevirtVmOutageScenarioPlugin()
self.kube_vm_plugin.init_clients(k8s_client=krkn_lib)
vmis = self.kube_vm_plugin.get_vmis(vmi_name_match,self.namespace)
self.kube_vm_plugin.get_vmis(vmi_name_match,self.namespace)
except Exception as e:
logging.error('Virt Check init exception: ' + str(e))
return
for vmi in vmis:
return
# See if multiple node names exist
node_name_list = [node_name for node_name in self.node_names.split(',') if node_name]
for vmi in self.kube_vm_plugin.vmis_list:
node_name = vmi.get("status",{}).get("nodeName")
vmi_name = vmi.get("metadata",{}).get("name")
ip_address = vmi.get("status",{}).get("interfaces",[])[0].get("ipAddress")
self.vm_list.append(VirtCheck({'vm_name':vmi_name, 'ip_address': ip_address, 'namespace':self.namespace, 'node_name':node_name, "new_ip_address":""}))
namespace = vmi.get("metadata",{}).get("namespace")
# If node_name_list exists, only add if node name is in list
if len(node_name_list) > 0 and node_name in node_name_list:
self.vm_list.append(VirtCheck({'vm_name':vmi_name, 'ip_address': ip_address, 'namespace':namespace, 'node_name':node_name, "new_ip_address":""}))
elif len(node_name_list) == 0:
# If node_name_list is blank, add all vms
self.vm_list.append(VirtCheck({'vm_name':vmi_name, 'ip_address': ip_address, 'namespace':namespace, 'node_name':node_name, "new_ip_address":""}))
self.batch_size = math.ceil(len(self.vm_list)/self.threads_limit)
def check_disconnected_access(self, ip_address: str, worker_name:str = '', vmi_name: str = ''):
virtctl_vm_cmd = f"ssh core@{worker_name} 'ssh -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no root@{ip_address}'"
virtctl_vm_cmd = f"ssh core@{worker_name} -o ConnectTimeout=5 'ssh -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no root@{ip_address}'"
all_out = invoke_no_exit(virtctl_vm_cmd)
logging.debug(f"Checking disconnected access for {ip_address} on {worker_name} output: {all_out}")
virtctl_vm_cmd = f"ssh core@{worker_name} 'ssh -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no root@{ip_address} 2>&1 | grep Permission' && echo 'True' || echo 'False'"
logging.debug(f"Checking disconnected access for {ip_address} on {worker_name} with command: {virtctl_vm_cmd}")
virtctl_vm_cmd = f"ssh core@{worker_name} -o ConnectTimeout=5 'ssh -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no root@{ip_address} 2>&1 | grep Permission' && echo 'True' || echo 'False'"
output = invoke_no_exit(virtctl_vm_cmd)
if 'True' in output:
logging.debug(f"Disconnected access for {ip_address} on {worker_name} is successful: {output}")
@@ -62,16 +78,14 @@ class VirtChecker:
new_node_name = vmi.get("status",{}).get("nodeName")
# if vm gets deleted, it'll start up with a new ip address
if new_ip_address != ip_address:
virtctl_vm_cmd = f"ssh core@{worker_name} 'ssh -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no root@{new_ip_address} 2>&1 | grep Permission' && echo 'True' || echo 'False'"
logging.debug(f"Checking disconnected access for {new_ip_address} on {worker_name} with command: {virtctl_vm_cmd}")
virtctl_vm_cmd = f"ssh core@{worker_name} -o ConnectTimeout=5 'ssh -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no root@{new_ip_address} 2>&1 | grep Permission' && echo 'True' || echo 'False'"
new_output = invoke_no_exit(virtctl_vm_cmd)
logging.debug(f"Disconnected access for {ip_address} on {worker_name}: {new_output}")
if 'True' in new_output:
return True, new_ip_address, None
# if node gets stopped, vmis will start up with a new node (and with new ip)
if new_node_name != worker_name:
virtctl_vm_cmd = f"ssh core@{new_node_name} 'ssh -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no root@{new_ip_address} 2>&1 | grep Permission' && echo 'True' || echo 'False'"
logging.debug(f"Checking disconnected access for {new_ip_address} on {new_node_name} with command: {virtctl_vm_cmd}")
virtctl_vm_cmd = f"ssh core@{new_node_name} -o ConnectTimeout=5 'ssh -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no root@{new_ip_address} 2>&1 | grep Permission' && echo 'True' || echo 'False'"
new_output = invoke_no_exit(virtctl_vm_cmd)
logging.debug(f"Disconnected access for {ip_address} on {new_node_name}: {new_output}")
if 'True' in new_output:
@@ -79,8 +93,7 @@ class VirtChecker:
# try to connect with a common "up" node as last resort
if self.ssh_node:
# using new_ip_address here since if it hasn't changed it'll match ip_address
virtctl_vm_cmd = f"ssh core@{self.ssh_node} 'ssh -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no root@{new_ip_address} 2>&1 | grep Permission' && echo 'True' || echo 'False'"
logging.debug(f"Checking disconnected access for {new_ip_address} on {self.ssh_node} with command: {virtctl_vm_cmd}")
virtctl_vm_cmd = f"ssh core@{self.ssh_node} -o ConnectTimeout=5 'ssh -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no root@{new_ip_address} 2>&1 | grep Permission' && echo 'True' || echo 'False'"
new_output = invoke_no_exit(virtctl_vm_cmd)
logging.debug(f"Disconnected access for {new_ip_address} on {self.ssh_node}: {new_output}")
if 'True' in new_output:
@@ -108,14 +121,19 @@ class VirtChecker:
for thread in self.threads:
thread.join()
def batch_list(self, queue: queue.Queue, batch_size=20):
# Provided prints to easily visualize how the threads are processed.
for i in range (0, len(self.vm_list),batch_size):
sub_list = self.vm_list[i: i+batch_size]
index = i
t = threading.Thread(target=self.run_virt_check,name=str(index), args=(sub_list,queue))
self.threads.append(t)
t.start()
def batch_list(self, queue: queue.Queue = None):
logging.info("batch size" + str(self.batch_size))
if self.batch_size > 0:
# Provided prints to easily visualize how the threads are processed.
for i in range (0, len(self.vm_list),self.batch_size):
if i+self.batch_size > len(self.vm_list):
sub_list = self.vm_list[i:]
else:
sub_list = self.vm_list[i: i+self.batch_size]
index = i
t = threading.Thread(target=self.run_virt_check,name=str(index), args=(sub_list,queue))
self.threads.append(t)
t.start()
def run_virt_check(self, vm_list_batch, virt_check_telemetry_queue: queue.Queue):
@@ -182,3 +200,63 @@ class VirtChecker:
else:
virt_check_telemetry.append(VirtCheck(virt_check_tracker[vm]))
virt_check_telemetry_queue.put(virt_check_telemetry)
def run_post_virt_check(self, vm_list_batch, virt_check_telemetry, post_virt_check_queue: queue.Queue):
virt_check_telemetry = []
virt_check_tracker = {}
start_timestamp = datetime.now()
for vm in vm_list_batch:
try:
if not self.disconnected:
vm_status = self.get_vm_access(vm.vm_name, vm.namespace)
else:
vm_status, new_ip_address, new_node_name = self.check_disconnected_access(vm.ip_address, vm.node_name, vm.vm_name)
if new_ip_address and vm.ip_address != new_ip_address:
vm.new_ip_address = new_ip_address
if new_node_name and vm.node_name != new_node_name:
vm.node_name = new_node_name
except Exception:
vm_status = False
if not vm_status:
virt_check_tracker= {
"vm_name": vm.vm_name,
"ip_address": vm.ip_address,
"namespace": vm.namespace,
"node_name": vm.node_name,
"status": vm_status,
"start_timestamp": start_timestamp.isoformat(),
"new_ip_address": vm.new_ip_address,
"duration": 0,
"end_timestamp": start_timestamp.isoformat()
}
virt_check_telemetry.append(VirtCheck(virt_check_tracker))
post_virt_check_queue.put(virt_check_telemetry)
def gather_post_virt_checks(self, kubevirt_check_telem):
post_kubevirt_check_queue = queue.Queue()
post_threads = []
if self.batch_size > 0:
for i in range (0, len(self.vm_list),self.batch_size):
sub_list = self.vm_list[i: i+self.batch_size]
index = i
t = threading.Thread(target=self.run_post_virt_check,name=str(index), args=(sub_list,kubevirt_check_telem, post_kubevirt_check_queue))
post_threads.append(t)
t.start()
kubevirt_check_telem = []
for thread in post_threads:
thread.join()
if not post_kubevirt_check_queue.empty():
kubevirt_check_telem.extend(post_kubevirt_check_queue.get_nowait())
if self.exit_on_failure and len(kubevirt_check_telem) > 0:
self.ret_value = 2
return kubevirt_check_telem

View File

@@ -16,7 +16,7 @@ google-cloud-compute==1.22.0
ibm_cloud_sdk_core==3.18.0
ibm_vpc==0.20.0
jinja2==3.1.6
krkn-lib==5.1.11
krkn-lib==5.1.12
lxml==5.1.0
kubernetes==34.1.0
numpy==1.26.4

View File

@@ -133,7 +133,7 @@ def main(options, command: Optional[str]) -> int:
telemetry_api_url = config["telemetry"].get("api_url")
health_check_config = get_yaml_item_value(config, "health_checks",{})
kubevirt_check_config = get_yaml_item_value(config, "kubevirt_checks", {})
# Initialize clients
if not os.path.isfile(kubeconfig_path) and not os.path.isfile(
"/var/run/secrets/kubernetes.io/serviceaccount/token"
@@ -408,15 +408,11 @@ def main(options, command: Optional[str]) -> int:
kubevirt_checker.thread_join()
kubevirt_check_telem = []
i =0
while i <= kubevirt_checker.threads_limit:
if not kubevirt_check_telemetry_queue.empty():
kubevirt_check_telem.extend(kubevirt_check_telemetry_queue.get_nowait())
else:
break
i+= 1
while not kubevirt_check_telemetry_queue.empty():
kubevirt_check_telem.extend(kubevirt_check_telemetry_queue.get_nowait())
chaos_telemetry.virt_checks = kubevirt_check_telem
post_kubevirt_check = kubevirt_checker.gather_post_virt_checks(kubevirt_check_telem)
chaos_telemetry.post_virt_checks = post_kubevirt_check
# if platform is openshift will be collected
# Cloud platform and network plugins metadata
# through OCP specific APIs
@@ -556,6 +552,10 @@ def main(options, command: Optional[str]) -> int:
logging.error("Health check failed for the applications, Please check; exiting")
return health_checker.ret_value
if kubevirt_checker.ret_value != 0:
logging.error("Kubevirt check still had failed VMIs at end of run, Please check; exiting")
return kubevirt_checker.ret_value
logging.info(
"Successfully finished running Kraken. UUID for the run: "
"%s. Report generated at %s. Exiting" % (run_uuid, report_file)

View File

@@ -6,3 +6,4 @@ scenarios:
action: 1
count: 1
retry_wait: 60
exclude_label: ""

View File

@@ -3,3 +3,4 @@ application_outage: # Scenario to create an out
namespace: <namespace-with-application> # Namespace to target - all application routes will go inaccessible if pod selector is empty
pod_selector: {app: foo} # Pods to target
block: [Ingress, Egress] # It can be Ingress or Egress or Ingress, Egress
exclude_label: "" # Optional label selector to exclude pods. Supports dict, string, or list format

View File

@@ -6,3 +6,4 @@ scenarios:
action: 1
count: 1
expected_recovery_time: 120
exclude_label: ""

View File

@@ -0,0 +1,37 @@
import tempfile
import unittest
from krkn.scenario_plugins.native.run_python_plugin import (
RunPythonFileInput,
run_python_file,
)
class RunPythonPluginTest(unittest.TestCase):
def test_success_execution(self):
tmp_file = tempfile.NamedTemporaryFile()
tmp_file.write(bytes("print('Hello world!')", "utf-8"))
tmp_file.flush()
output_id, output_data = run_python_file(
params=RunPythonFileInput(tmp_file.name),
run_id="test-python-plugin-success",
)
self.assertEqual("success", output_id)
self.assertEqual("Hello world!\n", output_data.stdout)
def test_error_execution(self):
tmp_file = tempfile.NamedTemporaryFile()
tmp_file.write(
bytes("import sys\nprint('Hello world!')\nsys.exit(42)\n", "utf-8")
)
tmp_file.flush()
output_id, output_data = run_python_file(
params=RunPythonFileInput(tmp_file.name), run_id="test-python-plugin-error"
)
self.assertEqual("error", output_id)
self.assertEqual(42, output_data.exit_code)
self.assertEqual("Hello world!\n", output_data.stdout)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for ApplicationOutageScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_application_outage_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.application_outage.application_outage_scenario_plugin import ApplicationOutageScenarioPlugin
class TestApplicationOutageScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ApplicationOutageScenarioPlugin
"""
self.plugin = ApplicationOutageScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["application_outages_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for ContainerScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_container_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.container.container_scenario_plugin import ContainerScenarioPlugin
class TestContainerScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ContainerScenarioPlugin
"""
self.plugin = ContainerScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["container_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,503 @@
#!/usr/bin/env python3
"""
Test suite for HealthChecker class
This test file provides comprehensive coverage for the main functionality of HealthChecker:
- HTTP request making with various authentication methods
- Health check monitoring with status tracking
- Failure detection and recovery tracking
- Exit on failure behavior
- Telemetry collection
Usage:
python -m coverage run -a -m unittest tests/test_health_checker.py -v
Assisted By: Claude Code
"""
import queue
import unittest
from datetime import datetime
from unittest.mock import MagicMock, patch
from krkn_lib.models.telemetry.models import HealthCheck
from krkn.utils.HealthChecker import HealthChecker
class TestHealthChecker(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for HealthChecker
"""
self.checker = HealthChecker(iterations=5)
self.health_check_queue = queue.Queue()
def tearDown(self):
"""
Clean up after each test
"""
self.checker.current_iterations = 0
self.checker.ret_value = 0
def make_increment_side_effect(self, response_data):
"""
Helper to create a side effect that increments current_iterations
"""
def side_effect(*args, **kwargs):
self.checker.current_iterations += 1
return response_data
return side_effect
@patch('requests.get')
def test_make_request_success(self, mock_get):
"""
Test make_request returns success for 200 status code
"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_get.return_value = mock_response
result = self.checker.make_request("http://example.com")
self.assertEqual(result["url"], "http://example.com")
self.assertEqual(result["status"], True)
self.assertEqual(result["status_code"], 200)
mock_get.assert_called_once_with(
"http://example.com",
auth=None,
headers=None,
verify=True,
timeout=3
)
@patch('requests.get')
def test_make_request_with_auth(self, mock_get):
"""
Test make_request with basic authentication
"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_get.return_value = mock_response
auth = ("user", "pass")
result = self.checker.make_request("http://example.com", auth=auth)
self.assertEqual(result["status"], True)
mock_get.assert_called_once_with(
"http://example.com",
auth=auth,
headers=None,
verify=True,
timeout=3
)
@patch('requests.get')
def test_make_request_with_bearer_token(self, mock_get):
"""
Test make_request with bearer token authentication
"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_get.return_value = mock_response
headers = {"Authorization": "Bearer token123"}
result = self.checker.make_request("http://example.com", headers=headers)
self.assertEqual(result["status"], True)
mock_get.assert_called_once_with(
"http://example.com",
auth=None,
headers=headers,
verify=True,
timeout=3
)
@patch('requests.get')
def test_make_request_failure(self, mock_get):
"""
Test make_request returns failure for non-200 status code
"""
mock_response = MagicMock()
mock_response.status_code = 500
mock_get.return_value = mock_response
result = self.checker.make_request("http://example.com")
self.assertEqual(result["status"], False)
self.assertEqual(result["status_code"], 500)
@patch('requests.get')
def test_make_request_with_verify_false(self, mock_get):
"""
Test make_request with SSL verification disabled
"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_get.return_value = mock_response
result = self.checker.make_request("https://example.com", verify=False)
self.assertEqual(result["status"], True)
mock_get.assert_called_once_with(
"https://example.com",
auth=None,
headers=None,
verify=False,
timeout=3
)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_empty_config(self, mock_sleep, mock_make_request):
"""
Test run_health_check with empty config skips checks
"""
config = {
"config": [],
"interval": 2
}
self.checker.run_health_check(config, self.health_check_queue)
mock_make_request.assert_not_called()
self.assertTrue(self.health_check_queue.empty())
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_successful_requests(self, mock_sleep, mock_make_request):
"""
Test run_health_check with all successful requests
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "http://example.com",
"status": True,
"status_code": 200
})
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False
}
],
"interval": 0.01
}
self.checker.iterations = 2
self.checker.run_health_check(config, self.health_check_queue)
# Should have telemetry
self.assertFalse(self.health_check_queue.empty())
telemetry = self.health_check_queue.get()
self.assertEqual(len(telemetry), 1)
self.assertEqual(telemetry[0].status, True)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_failure_then_recovery(self, mock_sleep, mock_make_request):
"""
Test run_health_check detects failure and recovery
"""
# Create side effects that increment and return different values
call_count = [0]
def side_effect(*args, **kwargs):
self.checker.current_iterations += 1
call_count[0] += 1
if call_count[0] == 1:
return {"url": "http://example.com", "status": False, "status_code": 500}
else:
return {"url": "http://example.com", "status": True, "status_code": 200}
mock_make_request.side_effect = side_effect
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False
}
],
"interval": 0.01
}
self.checker.iterations = 3
self.checker.run_health_check(config, self.health_check_queue)
# Should have telemetry showing failure period
self.assertFalse(self.health_check_queue.empty())
telemetry = self.health_check_queue.get()
# Should have at least 2 entries: one for failure period, one for success period
self.assertGreaterEqual(len(telemetry), 1)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_with_bearer_token(self, mock_sleep, mock_make_request):
"""
Test run_health_check correctly handles bearer token
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "http://example.com",
"status": True,
"status_code": 200
})
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": "test-token-123",
"auth": None,
"exit_on_failure": False
}
],
"interval": 0.01
}
self.checker.iterations = 1
self.checker.run_health_check(config, self.health_check_queue)
# Verify bearer token was added to headers
# make_request is called as: make_request(url, auth, headers, verify_url)
call_args = mock_make_request.call_args
self.assertEqual(call_args[0][2]['Authorization'], "Bearer test-token-123")
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_with_auth(self, mock_sleep, mock_make_request):
"""
Test run_health_check correctly handles basic auth
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "http://example.com",
"status": True,
"status_code": 200
})
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": "user,pass",
"exit_on_failure": False
}
],
"interval": 0.01
}
self.checker.iterations = 1
self.checker.run_health_check(config, self.health_check_queue)
# Verify auth tuple was created correctly
# make_request is called as: make_request(url, auth, headers, verify_url)
call_args = mock_make_request.call_args
self.assertEqual(call_args[0][1], ("user", "pass"))
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_exit_on_failure(self, mock_sleep, mock_make_request):
"""
Test run_health_check sets ret_value=2 when exit_on_failure is True
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "http://example.com",
"status": False,
"status_code": 500
})
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": True
}
],
"interval": 0.01
}
self.checker.iterations = 1
self.checker.run_health_check(config, self.health_check_queue)
# ret_value should be set to 2 on failure
self.assertEqual(self.checker.ret_value, 2)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_exit_on_failure_not_set_on_success(self, mock_sleep, mock_make_request):
"""
Test run_health_check does not set ret_value when request succeeds
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "http://example.com",
"status": True,
"status_code": 200
})
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": True
}
],
"interval": 0.01
}
self.checker.iterations = 1
self.checker.run_health_check(config, self.health_check_queue)
# ret_value should remain 0 on success
self.assertEqual(self.checker.ret_value, 0)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_with_verify_url_false(self, mock_sleep, mock_make_request):
"""
Test run_health_check respects verify_url setting
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "https://example.com",
"status": True,
"status_code": 200
})
config = {
"config": [
{
"url": "https://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False,
"verify_url": False
}
],
"interval": 0.01
}
self.checker.iterations = 1
self.checker.run_health_check(config, self.health_check_queue)
# Verify that verify parameter was set to False
# make_request is called as: make_request(url, auth, headers, verify_url)
call_args = mock_make_request.call_args
self.assertEqual(call_args[0][3], False)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_exception_handling(self, mock_sleep, mock_make_request):
"""
Test run_health_check handles exceptions during requests
"""
# Simulate exception during request but also increment to avoid infinite loop
def side_effect(*args, **kwargs):
self.checker.current_iterations += 1
raise Exception("Connection error")
mock_make_request.side_effect = side_effect
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False
}
],
"interval": 0.01
}
self.checker.iterations = 1
# Should not raise exception
self.checker.run_health_check(config, self.health_check_queue)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_multiple_urls(self, mock_sleep, mock_make_request):
"""
Test run_health_check with multiple URLs
"""
call_count = [0]
def side_effect(*args, **kwargs):
call_count[0] += 1
# Increment only after both URLs are called (one iteration)
if call_count[0] % 2 == 0:
self.checker.current_iterations += 1
return {
"status": True,
"status_code": 200
}
mock_make_request.side_effect = side_effect
config = {
"config": [
{
"url": "http://example1.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False
},
{
"url": "http://example2.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False
}
],
"interval": 0.01
}
self.checker.iterations = 1
self.checker.run_health_check(config, self.health_check_queue)
# Should have called make_request for both URLs
self.assertEqual(mock_make_request.call_count, 2)
@patch('krkn.utils.HealthChecker.HealthChecker.make_request')
@patch('time.sleep')
def test_run_health_check_custom_interval(self, mock_sleep, mock_make_request):
"""
Test run_health_check uses custom interval
"""
mock_make_request.side_effect = self.make_increment_side_effect({
"url": "http://example.com",
"status": True,
"status_code": 200
})
config = {
"config": [
{
"url": "http://example.com",
"bearer_token": None,
"auth": None,
"exit_on_failure": False
}
],
"interval": 5
}
self.checker.iterations = 2
self.checker.run_health_check(config, self.health_check_queue)
# Verify sleep was called with custom interval
mock_sleep.assert_called_with(5)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for HogsScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_hogs_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.hogs.hogs_scenario_plugin import HogsScenarioPlugin
class TestHogsScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for HogsScenarioPlugin
"""
self.plugin = HogsScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["hog_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,696 @@
#!/usr/bin/env python3
"""
Test suite for KubeVirt VM Outage Scenario Plugin class
Note: This test file uses mocks extensively to avoid needing actual Kubernetes/KubeVirt infrastructure.
Usage:
python -m coverage run -a -m unittest tests/test_kubevirt_vm_outage.py -v
Assisted By: Claude Code
"""
import copy
import itertools
import os
import tempfile
import unittest
from unittest.mock import MagicMock, patch
import yaml
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.k8s import AffectedPod, PodsStatus
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from kubernetes.client.rest import ApiException
from krkn.scenario_plugins.kubevirt_vm_outage.kubevirt_vm_outage_scenario_plugin import KubevirtVmOutageScenarioPlugin
class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for KubevirtVmOutageScenarioPlugin
"""
self.plugin = KubevirtVmOutageScenarioPlugin()
# Create mock k8s client
self.k8s_client = MagicMock()
self.custom_object_client = MagicMock()
self.k8s_client.custom_object_client = self.custom_object_client
self.plugin.k8s_client = self.k8s_client
self.plugin.custom_object_client = self.custom_object_client
# Mock methods needed for KubeVirt operations
self.k8s_client.list_custom_resource_definition = MagicMock()
# Mock custom resource definition list with KubeVirt CRDs
crd_list = MagicMock()
crd_item = MagicMock()
crd_item.spec = MagicMock()
crd_item.spec.group = "kubevirt.io"
crd_list.items = [crd_item]
self.k8s_client.list_custom_resource_definition.return_value = crd_list
# Mock VMI data
self.mock_vmi = {
"metadata": {
"name": "test-vm",
"namespace": "default"
},
"status": {
"phase": "Running"
}
}
# Create test config
self.config = {
"scenarios": [
{
"name": "kubevirt outage test",
"scenario": "kubevirt_vm_outage",
"parameters": {
"vm_name": "test-vm",
"namespace": "default",
"duration": 0
}
}
]
}
# Create a temporary config file
temp_dir = tempfile.gettempdir()
self.scenario_file = os.path.join(temp_dir, "test_kubevirt_scenario.yaml")
with open(self.scenario_file, "w") as f:
yaml.dump(self.config, f)
# Mock dependencies
self.telemetry = MagicMock(spec=KrknTelemetryOpenshift)
self.scenario_telemetry = MagicMock(spec=ScenarioTelemetry)
self.telemetry.get_lib_kubernetes.return_value = self.k8s_client
# Initialize counters for reusable mock functions
self.delete_count = 0
self.wait_count = 0
def mock_delete(self, *args, **kwargs):
self.delete_count += 1
self.plugin.affected_pod = AffectedPod(pod_name=f"test-vm-{self.delete_count}", namespace="default")
self.plugin.affected_pod.pod_rescheduling_time = 5.0
return 0
def mock_wait(self, *args, **kwargs):
self.wait_count += 1
self.plugin.affected_pod.pod_readiness_time = 3.0
return 0
def test_successful_injection_and_recovery(self):
"""
Test successful deletion and recovery of a VMI
"""
# Populate vmis_list to avoid randrange error
self.plugin.vmis_list = [self.mock_vmi]
# Mock get_vmis to not clear the list
with patch.object(self.plugin, 'get_vmis'):
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock validate_environment to return True
with patch.object(self.plugin, 'validate_environment', return_value=True):
# Mock delete_vmi and wait_for_running to simulate success
with patch.object(self.plugin, 'delete_vmi', side_effect=self.mock_delete) as mock_delete:
with patch.object(self.plugin, 'wait_for_running', side_effect=self.mock_wait) as mock_wait:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
mock_delete.assert_called_once_with("test-vm", "default", False)
mock_wait.assert_called_once_with("test-vm", "default", 60)
def test_injection_failure(self):
"""
Test failure during VMI deletion
"""
# Populate vmis_list to avoid randrange error
self.plugin.vmis_list = [self.mock_vmi]
# Mock get_vmis to not clear the list
with patch.object(self.plugin, 'get_vmis'):
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock validate_environment to return True
with patch.object(self.plugin, 'validate_environment', return_value=True):
# Mock delete_vmi to simulate failure
with patch.object(self.plugin, 'delete_vmi', return_value=1) as mock_delete:
with patch.object(self.plugin, 'wait_for_running', return_value=0) as mock_wait:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
mock_delete.assert_called_once_with("test-vm", "default", False)
mock_wait.assert_not_called()
def test_disable_auto_restart(self):
"""
Test VM auto-restart can be disabled
"""
# Configure test with disable_auto_restart=True
self.config["scenarios"][0]["parameters"]["disable_auto_restart"] = True
# Populate vmis_list to avoid randrange error
self.plugin.vmis_list = [self.mock_vmi]
# Mock get_vmis to not clear the list
with patch.object(self.plugin, 'get_vmis'):
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock validate_environment to return True
with patch.object(self.plugin, 'validate_environment', return_value=True):
# Mock delete_vmi and wait_for_running
with patch.object(self.plugin, 'delete_vmi', side_effect=self.mock_delete) as mock_delete:
with patch.object(self.plugin, 'wait_for_running', side_effect=self.mock_wait) as mock_wait:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
# delete_vmi should be called with disable_auto_restart=True
mock_delete.assert_called_once_with("test-vm", "default", True)
mock_wait.assert_called_once_with("test-vm", "default", 60)
def test_recovery_when_vmi_does_not_exist(self):
"""
Test recovery logic when VMI does not exist after deletion
"""
# Initialize the plugin's custom_object_client
self.plugin.custom_object_client = self.custom_object_client
# Store the original VMI in the plugin for recovery
self.plugin.original_vmi = self.mock_vmi.copy()
# Create a cleaned vmi_dict as the plugin would
vmi_dict = self.mock_vmi.copy()
# Set up running VMI data for after recovery
running_vmi = {
"metadata": {"name": "test-vm", "namespace": "default"},
"status": {"phase": "Running"}
}
# Set up time.time to immediately exceed the timeout for auto-recovery
with patch('time.time', side_effect=[0, 301, 301, 301, 301, 310, 320]):
# Mock get_vmi to always return None (not auto-recovered)
with patch.object(self.plugin, 'get_vmi', side_effect=[None, None, running_vmi]):
# Mock the custom object API to return success
self.custom_object_client.create_namespaced_custom_object = MagicMock(return_value=running_vmi)
# Run recovery with mocked time.sleep
with patch('time.sleep'):
result = self.plugin.recover("test-vm", "default", False)
self.assertEqual(result, 0)
# Verify create was called with the right arguments for our API version and kind
self.custom_object_client.create_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
body=vmi_dict
)
def test_validation_failure(self):
"""
Test validation failure when KubeVirt is not installed
"""
# Populate vmis_list to avoid randrange error
self.plugin.vmis_list = [self.mock_vmi]
# Mock get_vmis to not clear the list
with patch.object(self.plugin, 'get_vmis'):
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock validate_environment to return False (KubeVirt not installed)
with patch.object(self.plugin, 'validate_environment', return_value=False):
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
# When validation fails, run() returns 1 due to exception handling
self.assertEqual(result, 1)
def test_delete_vmi_timeout(self):
"""
Test timeout during VMI deletion
"""
# Initialize the plugin's custom_object_client and required attributes
self.plugin.custom_object_client = self.custom_object_client
# Initialize original_vmi which is required by delete_vmi
self.plugin.original_vmi = self.mock_vmi.copy()
self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z'
# Initialize pods_status which delete_vmi needs
from krkn_lib.models.k8s import PodsStatus, AffectedPod
self.plugin.pods_status = PodsStatus()
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
# Mock successful delete operation
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
# Mock that get_vmi always returns VMI with same creationTimestamp (never gets recreated)
mock_vmi_with_time = self.mock_vmi.copy()
mock_vmi_with_time['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z'
with patch.object(self.plugin, 'get_vmi', return_value=mock_vmi_with_time):
# Simulate timeout by making time.time return values that exceed the timeout
with patch('time.sleep'), patch('time.time', side_effect=[0, 10, 20, 130, 130, 130, 130, 140]):
result = self.plugin.delete_vmi("test-vm", "default", False)
self.assertEqual(result, 1)
self.custom_object_client.delete_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
name="test-vm"
)
def test_get_vmi_api_exception_non_404(self):
"""
Test get_vmi raises ApiException for non-404 errors
"""
# Mock API exception with non-404 status
api_error = ApiException(status=500, reason="Internal Server Error")
self.custom_object_client.get_namespaced_custom_object = MagicMock(side_effect=api_error)
with self.assertRaises(ApiException):
self.plugin.get_vmi("test-vm", "default")
def test_get_vmi_general_exception(self):
"""
Test get_vmi raises general exceptions
"""
# Mock general exception
self.custom_object_client.get_namespaced_custom_object = MagicMock(
side_effect=Exception("Connection error")
)
with self.assertRaises(Exception):
self.plugin.get_vmi("test-vm", "default")
def test_get_vmis_with_regex_matching(self):
"""
Test get_vmis successfully filters VMIs by regex pattern
"""
# Mock namespace list
self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default", "test-ns"])
# Mock VMI list with multiple VMIs
vmi_list = {
"items": [
{"metadata": {"name": "test-vm-1"}, "status": {"phase": "Running"}},
{"metadata": {"name": "test-vm-2"}, "status": {"phase": "Running"}},
{"metadata": {"name": "other-vm"}, "status": {"phase": "Running"}},
]
}
self.custom_object_client.list_namespaced_custom_object = MagicMock(return_value=vmi_list)
# Test with regex pattern that matches test-vm-*
self.plugin.get_vmis("test-vm-.*", "default")
# Should have 4 VMs (2 per namespace * 2 namespaces)
self.assertEqual(len(self.plugin.vmis_list), 4)
# Verify only test-vm-* were added
for vmi in self.plugin.vmis_list:
self.assertTrue(vmi["metadata"]["name"].startswith("test-vm-"))
def test_get_vmis_api_exception_404(self):
"""
Test get_vmis handles 404 ApiException gracefully
"""
self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default"])
api_error = ApiException(status=404, reason="Not Found")
self.custom_object_client.list_namespaced_custom_object = MagicMock(side_effect=api_error)
# Should not raise, returns empty list
result = self.plugin.get_vmis("test-vm", "default")
self.assertEqual(result, [])
def test_get_vmis_api_exception_non_404(self):
"""
Test get_vmis raises ApiException for non-404 errors
"""
self.k8s_client.list_namespaces_by_regex = MagicMock(return_value=["default"])
api_error = ApiException(status=500, reason="Internal Server Error")
self.custom_object_client.list_namespaced_custom_object = MagicMock(side_effect=api_error)
with self.assertRaises(ApiException):
self.plugin.get_vmis("test-vm", "default")
def test_patch_vm_spec_success(self):
"""
Test patch_vm_spec successfully patches VM
"""
mock_vm = {
"metadata": {"name": "test-vm", "namespace": "default"},
"spec": {"running": True}
}
self.custom_object_client.get_namespaced_custom_object = MagicMock(return_value=mock_vm)
self.custom_object_client.patch_namespaced_custom_object = MagicMock(return_value=mock_vm)
result = self.plugin.patch_vm_spec("test-vm", "default", False)
self.assertTrue(result)
self.custom_object_client.patch_namespaced_custom_object.assert_called_once()
def test_patch_vm_spec_api_exception(self):
"""
Test patch_vm_spec handles ApiException
"""
api_error = ApiException(status=404, reason="Not Found")
self.custom_object_client.get_namespaced_custom_object = MagicMock(side_effect=api_error)
result = self.plugin.patch_vm_spec("test-vm", "default", False)
self.assertFalse(result)
def test_patch_vm_spec_general_exception(self):
"""
Test patch_vm_spec handles general exceptions
"""
self.custom_object_client.get_namespaced_custom_object = MagicMock(
side_effect=Exception("Connection error")
)
result = self.plugin.patch_vm_spec("test-vm", "default", False)
self.assertFalse(result)
def test_delete_vmi_api_exception_404(self):
"""
Test delete_vmi handles 404 ApiException during deletion
"""
# Initialize required attributes
self.plugin.original_vmi = self.mock_vmi.copy()
self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z'
self.plugin.pods_status = PodsStatus()
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
api_error = ApiException(status=404, reason="Not Found")
self.custom_object_client.delete_namespaced_custom_object = MagicMock(side_effect=api_error)
result = self.plugin.delete_vmi("test-vm", "default", False)
self.assertEqual(result, 1)
def test_delete_vmi_api_exception_non_404(self):
"""
Test delete_vmi handles non-404 ApiException during deletion
"""
# Initialize required attributes
self.plugin.original_vmi = self.mock_vmi.copy()
self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z'
self.plugin.pods_status = PodsStatus()
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
api_error = ApiException(status=500, reason="Internal Server Error")
self.custom_object_client.delete_namespaced_custom_object = MagicMock(side_effect=api_error)
result = self.plugin.delete_vmi("test-vm", "default", False)
self.assertEqual(result, 1)
def test_delete_vmi_successful_recreation(self):
"""
Test delete_vmi succeeds when VMI is recreated with new creationTimestamp
"""
# Initialize required attributes - use deepcopy to avoid shared references
self.plugin.original_vmi = copy.deepcopy(self.mock_vmi)
self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z'
self.plugin.pods_status = PodsStatus()
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
# Mock get_vmi to return VMI with new creationTimestamp - use deepcopy
new_vmi = copy.deepcopy(self.mock_vmi)
new_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:05:00Z'
# Use itertools to create an infinite iterator for time values
time_iter = itertools.count(0, 0.001)
with patch.object(self.plugin, 'get_vmi', return_value=new_vmi):
with patch('time.sleep'):
with patch('time.time', side_effect=lambda: next(time_iter)):
result = self.plugin.delete_vmi("test-vm", "default", False)
self.assertEqual(result, 0)
self.assertIsNotNone(self.plugin.affected_pod.pod_rescheduling_time)
def test_delete_vmi_with_disable_auto_restart_failure(self):
"""
Test delete_vmi continues when patch_vm_spec fails and VMI stays deleted
"""
# Initialize required attributes
self.plugin.original_vmi = self.mock_vmi.copy()
self.plugin.original_vmi['metadata']['creationTimestamp'] = '2023-01-01T00:00:00Z'
self.plugin.pods_status = PodsStatus()
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
# Mock patch_vm_spec to fail
with patch.object(self.plugin, 'patch_vm_spec', return_value=False):
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
# Mock VMI deleted (returns None) - it will timeout waiting for recreation
with patch.object(self.plugin, 'get_vmi', return_value=None):
with patch('time.sleep'):
# Use itertools to create infinite time sequence
# Use 1.0 increment to quickly reach timeout (120 seconds)
time_iter = itertools.count(0, 1.0)
with patch('time.time', side_effect=lambda: next(time_iter)):
result = self.plugin.delete_vmi("test-vm", "default", True)
# When VMI stays deleted (None), delete_vmi waits for recreation and times out
self.assertEqual(result, 1)
def test_wait_for_running_timeout(self):
"""
Test wait_for_running times out when VMI doesn't reach Running state
"""
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
# Mock VMI in Pending state
pending_vmi = self.mock_vmi.copy()
pending_vmi['status']['phase'] = 'Pending'
with patch.object(self.plugin, 'get_vmi', return_value=pending_vmi):
with patch('time.sleep'):
with patch('time.time', side_effect=[0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 121]):
result = self.plugin.wait_for_running("test-vm", "default", 120)
self.assertEqual(result, 1)
def test_wait_for_running_vmi_not_exists(self):
"""
Test wait_for_running when VMI doesn't exist yet
"""
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
# First return None (not exists), then return running VMI
running_vmi = self.mock_vmi.copy()
running_vmi['status']['phase'] = 'Running'
with patch.object(self.plugin, 'get_vmi', side_effect=[None, None, running_vmi]):
with patch('time.sleep'):
# time.time() called: start_time (0), while loop iteration 1 (1), iteration 2 (2), iteration 3 (3), end_time (3)
with patch('time.time', side_effect=[0, 1, 2, 3, 3]):
result = self.plugin.wait_for_running("test-vm", "default", 120)
self.assertEqual(result, 0)
self.assertIsNotNone(self.plugin.affected_pod.pod_readiness_time)
def test_recover_no_original_vmi(self):
"""
Test recover fails when no original VMI is captured
"""
self.plugin.original_vmi = None
result = self.plugin.recover("test-vm", "default", False)
self.assertEqual(result, 1)
def test_recover_exception_during_creation(self):
"""
Test recover handles exception during VMI creation
"""
self.plugin.original_vmi = self.mock_vmi.copy()
self.custom_object_client.create_namespaced_custom_object = MagicMock(
side_effect=Exception("Creation failed")
)
with patch.object(self.plugin, 'get_vmi', return_value=None):
with patch('time.sleep'):
with patch('time.time', side_effect=[0, 301]):
result = self.plugin.recover("test-vm", "default", False)
self.assertEqual(result, 1)
def test_execute_scenario_missing_vm_name(self):
"""
Test execute_scenario fails when vm_name is missing
"""
config = {
"parameters": {
"namespace": "default"
}
}
result = self.plugin.execute_scenario(config, self.scenario_telemetry)
self.assertEqual(result, 1)
def test_execute_scenario_vmi_not_found(self):
"""
Test execute_scenario when VMI is not found after get_vmi
"""
self.plugin.vmis_list = [self.mock_vmi]
config = {
"parameters": {
"vm_name": "test-vm",
"namespace": "default"
}
}
with patch.object(self.plugin, 'get_vmis'):
with patch.object(self.plugin, 'validate_environment', return_value=True):
# First get_vmi returns VMI, second returns None
with patch.object(self.plugin, 'get_vmi', side_effect=[self.mock_vmi, None]):
result = self.plugin.execute_scenario(config, self.scenario_telemetry)
# Should be PodsStatus with unrecovered pod
self.assertIsInstance(result, type(self.plugin.pods_status))
def test_execute_scenario_with_kill_count(self):
"""
Test execute_scenario with kill_count > 1
"""
# Create multiple VMIs
vmi_1 = self.mock_vmi.copy()
vmi_1["metadata"]["name"] = "test-vm-1"
vmi_2 = self.mock_vmi.copy()
vmi_2["metadata"]["name"] = "test-vm-2"
self.plugin.vmis_list = [vmi_1, vmi_2]
config = {
"parameters": {
"vm_name": "test-vm",
"namespace": "default",
"kill_count": 2
}
}
# Reset counters
self.delete_count = 0
self.wait_count = 0
with patch.object(self.plugin, 'get_vmis'):
with patch.object(self.plugin, 'validate_environment', return_value=True):
with patch.object(self.plugin, 'get_vmi', side_effect=[vmi_1, vmi_2]):
with patch.object(self.plugin, 'delete_vmi', side_effect=self.mock_delete) as mock_del:
with patch.object(self.plugin, 'wait_for_running', side_effect=self.mock_wait) as mock_wt:
result = self.plugin.execute_scenario(config, self.scenario_telemetry)
# Should call delete_vmi and wait_for_running twice
self.assertEqual(mock_del.call_count, 2)
self.assertEqual(mock_wt.call_count, 2)
def test_execute_scenario_wait_for_running_failure(self):
"""
Test execute_scenario when wait_for_running fails
"""
self.plugin.vmis_list = [self.mock_vmi]
config = {
"parameters": {
"vm_name": "test-vm",
"namespace": "default"
}
}
def mock_delete(*args, **kwargs):
self.plugin.affected_pod = AffectedPod(pod_name="test-vm", namespace="default")
self.plugin.affected_pod.pod_rescheduling_time = 5.0
return 0
with patch.object(self.plugin, 'get_vmis'):
with patch.object(self.plugin, 'validate_environment', return_value=True):
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
with patch.object(self.plugin, 'delete_vmi', side_effect=mock_delete):
with patch.object(self.plugin, 'wait_for_running', return_value=1):
result = self.plugin.execute_scenario(config, self.scenario_telemetry)
# Should have unrecovered pod
self.assertEqual(len(result.unrecovered), 1)
def test_validate_environment_exception(self):
"""
Test validate_environment handles exceptions
"""
self.custom_object_client.list_namespaced_custom_object = MagicMock(
side_effect=Exception("Connection error")
)
result = self.plugin.validate_environment("test-vm", "default")
self.assertFalse(result)
def test_validate_environment_vmi_not_found(self):
"""
Test validate_environment when VMI doesn't exist
"""
# Mock CRDs exist
mock_crd_list = MagicMock()
mock_crd_list.items = MagicMock(return_value=["item1"])
self.custom_object_client.list_namespaced_custom_object = MagicMock(return_value=mock_crd_list)
# Mock VMI not found
with patch.object(self.plugin, 'get_vmi', return_value=None):
result = self.plugin.validate_environment("test-vm", "default")
self.assertFalse(result)
def test_init_clients(self):
"""
Test init_clients initializes k8s client correctly
"""
mock_k8s = MagicMock(spec=KrknKubernetes)
mock_custom_client = MagicMock()
mock_k8s.custom_object_client = mock_custom_client
self.plugin.init_clients(mock_k8s)
self.assertEqual(self.plugin.k8s_client, mock_k8s)
self.assertEqual(self.plugin.custom_object_client, mock_custom_client)
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["kubevirt_vm_outage"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for ManagedClusterScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_managed_cluster_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.managed_cluster.managed_cluster_scenario_plugin import ManagedClusterScenarioPlugin
class TestManagedClusterScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ManagedClusterScenarioPlugin
"""
self.plugin = ManagedClusterScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["managedcluster_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for NativeScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_native_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.native.native_scenario_plugin import NativeScenarioPlugin
class TestNativeScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for NativeScenarioPlugin
"""
self.plugin = NativeScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario types
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["pod_network_scenarios", "ingress_node_scenarios"])
self.assertEqual(len(result), 2)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for NetworkChaosNgScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_network_chaos_ng_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.network_chaos_ng.network_chaos_ng_scenario_plugin import NetworkChaosNgScenarioPlugin
class TestNetworkChaosNgScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for NetworkChaosNgScenarioPlugin
"""
self.plugin = NetworkChaosNgScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["network_chaos_ng_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for NetworkChaosScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_network_chaos_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.network_chaos.network_chaos_scenario_plugin import NetworkChaosScenarioPlugin
class TestNetworkChaosScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for NetworkChaosScenarioPlugin
"""
self.plugin = NetworkChaosScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["network_chaos_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for NodeActionsScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_node_actions_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.node_actions.node_actions_scenario_plugin import NodeActionsScenarioPlugin
class TestNodeActionsScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for NodeActionsScenarioPlugin
"""
self.plugin = NodeActionsScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["node_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for PodDisruptionScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_pod_disruption_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.pod_disruption.pod_disruption_scenario_plugin import PodDisruptionScenarioPlugin
class TestPodDisruptionScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for PodDisruptionScenarioPlugin
"""
self.plugin = PodDisruptionScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["pod_disruption_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for PvcScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_pvc_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.pvc.pvc_scenario_plugin import PvcScenarioPlugin
class TestPvcScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for PvcScenarioPlugin
"""
self.plugin = PvcScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["pvc_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -58,25 +58,23 @@ class TestRollbackScenarioPlugin:
for vf in version_files
]
def execute_version_file(self, version_file: str):
def execute_version_file(self, version_file: str, telemetry_ocp: KrknTelemetryOpenshift):
"""
Execute a rollback version file using subprocess.
Execute a rollback version file using the new importlib approach.
:param version_file: The path to the version file to execute.
"""
print(f"Executing rollback version file: {version_file}")
result = subprocess.run(
[sys.executable, version_file],
capture_output=True,
text=True,
)
assert result.returncode == 0, (
f"Rollback version file {version_file} failed with return code {result.returncode}. "
f"Output: {result.stdout}, Error: {result.stderr}"
)
print(
f"Rollback version file executed successfully: {version_file} with output: {result.stdout}"
)
try:
from krkn.rollback.handler import _parse_rollback_module
rollback_callable, rollback_content = _parse_rollback_module(version_file)
rollback_callable(rollback_content, telemetry_ocp)
print(f"Rollback version file executed successfully: {version_file}")
except Exception as e:
raise AssertionError(
f"Rollback version file {version_file} failed with error: {e}"
)
@pytest.fixture(autouse=True)
def setup_logging(self):
@@ -130,7 +128,11 @@ class TestRollbackScenarioPlugin:
)
@pytest.mark.usefixtures("setup_rollback_config")
def test_simple_rollback_scenario_plugin(self, lib_telemetry, scenario_telemetry):
def test_simple_rollback_scenario_plugin(
self,
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
):
from tests.rollback_scenario_plugins.simple import SimpleRollbackScenarioPlugin
scenario_type = "simple_rollback_scenario"
@@ -157,4 +159,166 @@ class TestRollbackScenarioPlugin:
)
# Execute the rollback version file
for version_file in version_files:
self.execute_version_file(version_file)
self.execute_version_file(version_file, lib_telemetry)
class TestRollbackConfig:
@pytest.mark.parametrize("directory_name,run_uuid,expected", [
("123456789-abcdefgh", "abcdefgh", True),
("123456789-abcdefgh", None, True),
("123456789-abcdefgh", "ijklmnop", False),
("123456789-", "abcdefgh", False),
("-abcdefgh", "abcdefgh", False),
("123456789-abcdefgh-ijklmnop", "abcdefgh", False),
])
def test_is_rollback_context_directory_format(self, directory_name, run_uuid, expected):
assert RollbackConfig.is_rollback_context_directory_format(directory_name, run_uuid) == expected
@pytest.mark.parametrize("file_name,expected", [
("simple_rollback_scenario_123456789_abcdefgh.py", True),
("simple_rollback_scenario_123456789_abcdefgh.py.executed", False),
("simple_rollback_scenario_123456789_abc.py", False),
("simple_rollback_scenario_123456789_abcdefgh.txt", False),
("simple_rollback_scenario_123456789_.py", False),
])
def test_is_rollback_version_file_format(self, file_name, expected):
assert RollbackConfig.is_rollback_version_file_format(file_name) == expected
class TestRollbackCommand:
@pytest.mark.parametrize("auto_rollback", [True, False], ids=["enabled_rollback", "disabled_rollback"])
@pytest.mark.parametrize("encounter_exception", [True, False], ids=["no_exception", "with_exception"])
def test_execute_rollback_command_ignore_auto_rollback_config(self, auto_rollback, encounter_exception):
"""Test execute_rollback function with different auto rollback configurations."""
from krkn.rollback.command import execute_rollback
from krkn.rollback.config import RollbackConfig
from unittest.mock import Mock, patch
# Create mock telemetry
mock_telemetry = Mock()
# Mock search_rollback_version_files to return some test files
mock_version_files = [
"/tmp/test_versions/123456789-test-uuid/scenario_123456789_abcdefgh.py",
"/tmp/test_versions/123456789-test-uuid/scenario_123456789_ijklmnop.py"
]
with (
patch.object(RollbackConfig, 'auto', auto_rollback) as _,
patch.object(RollbackConfig, 'search_rollback_version_files', return_value=mock_version_files) as mock_search,
patch('krkn.rollback.command.execute_rollback_version_files') as mock_execute
):
if encounter_exception:
mock_execute.side_effect = Exception("Test exception")
# Call the function
result = execute_rollback(
telemetry_ocp=mock_telemetry,
run_uuid="test-uuid",
scenario_type="scenario"
)
# Verify return code
assert result == 0 if not encounter_exception else 1
# Verify that execute_rollback_version_files was called with correct parameters
mock_execute.assert_called_once_with(
mock_telemetry,
"test-uuid",
"scenario",
ignore_auto_rollback_config=True
)
class TestRollbackAbstractScenarioPlugin:
@pytest.mark.parametrize("auto_rollback", [True, False], ids=["enabled_rollback", "disabled_rollback"])
@pytest.mark.parametrize("scenario_should_fail", [True, False], ids=["failing_scenario", "successful_scenario"])
def test_run_scenarios_respect_auto_rollback_config(self, auto_rollback, scenario_should_fail):
"""Test that run_scenarios respects the auto rollback configuration."""
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn.rollback.config import RollbackConfig
from unittest.mock import Mock, patch
# Create a test scenario plugin
class TestScenarioPlugin(AbstractScenarioPlugin):
def run(self, run_uuid: str, scenario: str, krkn_config: dict, lib_telemetry, scenario_telemetry):
return 1 if scenario_should_fail else 0
def get_scenario_types(self) -> list[str]:
return ["test_scenario"]
# Create mock objects
mock_telemetry = Mock()
mock_telemetry.set_parameters_base64.return_value = "test_scenario.yaml"
mock_telemetry.get_telemetry_request_id.return_value = "test_request_id"
mock_telemetry.get_lib_kubernetes.return_value = Mock()
test_plugin = TestScenarioPlugin("test_scenario")
# Mock version files to be returned by search
mock_version_files = [
"/tmp/test_versions/123456789-test-uuid/test_scenario_123456789_abcdefgh.py"
]
with (
patch.object(RollbackConfig, 'auto', auto_rollback),
patch.object(RollbackConfig, 'versions_directory', "/tmp/test_versions"),
patch.object(RollbackConfig, 'search_rollback_version_files', return_value=mock_version_files) as mock_search,
patch('krkn.rollback.handler._parse_rollback_module') as mock_parse,
patch('krkn.scenario_plugins.abstract_scenario_plugin.utils.collect_and_put_ocp_logs'),
patch('krkn.scenario_plugins.abstract_scenario_plugin.signal_handler.signal_context') as mock_signal_context,
patch('krkn.scenario_plugins.abstract_scenario_plugin.time.sleep'),
patch('os.path.exists', return_value=True),
patch('os.rename') as mock_rename,
patch('os.remove') as mock_remove
):
# Make signal_context a no-op context manager
mock_signal_context.return_value.__enter__ = Mock(return_value=None)
mock_signal_context.return_value.__exit__ = Mock(return_value=None)
# Mock _parse_rollback_module to return test callable and content
mock_rollback_callable = Mock()
mock_rollback_content = Mock()
mock_parse.return_value = (mock_rollback_callable, mock_rollback_content)
# Call run_scenarios
test_plugin.run_scenarios(
run_uuid="test-uuid",
scenarios_list=["test_scenario.yaml"],
krkn_config={
"tunings": {"wait_duration": 0},
"telemetry": {"events_backup": False}
},
telemetry=mock_telemetry
)
# Verify results
if scenario_should_fail:
if auto_rollback:
# search_rollback_version_files should always be called when scenario fails
mock_search.assert_called_once_with("test-uuid", "test_scenario")
# When auto_rollback is True, _parse_rollback_module should be called
mock_parse.assert_called_once_with(mock_version_files[0])
# And the rollback callable should be executed
mock_rollback_callable.assert_called_once_with(mock_rollback_content, mock_telemetry)
# File should be renamed after successful execution
mock_rename.assert_called_once_with(
mock_version_files[0],
f"{mock_version_files[0]}.executed"
)
else:
# When scenario fail but auto_rollback is False, _parse_rollback_module should NOT be called
mock_search.assert_not_called()
mock_parse.assert_not_called()
mock_rollback_callable.assert_not_called()
mock_rename.assert_not_called()
else:
mock_search.assert_called_once_with("test-uuid", "test_scenario")
# Will remove the version files instead of renaming them if scenario succeeds
mock_remove.assert_called_once_with(
mock_version_files[0]
)
# When scenario succeeds, rollback should not be executed at all
mock_parse.assert_not_called()
mock_rollback_callable.assert_not_called()
mock_rename.assert_not_called()

View File

@@ -0,0 +1,39 @@
#!/usr/bin/env python3
"""
Test suite for ServiceDisruptionScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_service_disruption_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.service_disruption.service_disruption_scenario_plugin import ServiceDisruptionScenarioPlugin
class TestServiceDisruptionScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ServiceDisruptionScenarioPlugin
"""
self.plugin = ServiceDisruptionScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["service_disruption_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python3
"""
Test suite for ServiceHijackingScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_service_hijacking_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from krkn.scenario_plugins.service_hijacking.service_hijacking_scenario_plugin import ServiceHijackingScenarioPlugin
class TestServiceHijackingScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ServiceHijackingScenarioPlugin
"""
self.plugin = ServiceHijackingScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["service_hijacking_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python3
"""
Test suite for ShutDownScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_shut_down_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from krkn.scenario_plugins.shut_down.shut_down_scenario_plugin import ShutDownScenarioPlugin
class TestShutDownScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ShutDownScenarioPlugin
"""
self.plugin = ShutDownScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["cluster_shut_down_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python3
"""
Test suite for SynFloodScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_syn_flood_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from krkn.scenario_plugins.syn_flood.syn_flood_scenario_plugin import SynFloodScenarioPlugin
class TestSynFloodScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for SynFloodScenarioPlugin
"""
self.plugin = SynFloodScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["syn_flood_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for TimeActionsScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_time_actions_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.time_actions.time_actions_scenario_plugin import TimeActionsScenarioPlugin
class TestTimeActionsScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for TimeActionsScenarioPlugin
"""
self.plugin = TimeActionsScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["time_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

590
tests/test_virt_checker.py Normal file
View File

@@ -0,0 +1,590 @@
#!/usr/bin/env python3
"""
Test suite for VirtChecker class
This test file provides comprehensive coverage for the main functionality of VirtChecker:
- Initialization with various configurations
- VM access checking (both virtctl and disconnected modes)
- Disconnected mode with IP/node changes
- Thread management
- Post-check validation
Usage:
python -m coverage run -a -m unittest tests/test_virt_checker.py -v
Note: This test file uses mocks extensively to avoid needing actual Kubernetes/KubeVirt infrastructure.
Created By: Claude Code
"""
import unittest
from unittest.mock import MagicMock, patch
import sys
from krkn.utils.VirtChecker import VirtChecker
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
# Create a mock VirtCheck class before any imports
class MockVirtCheck:
"""Mock VirtCheck class for testing"""
def __init__(self, data):
self.vm_name = data.get('vm_name', '')
self.ip_address = data.get('ip_address', '')
self.namespace = data.get('namespace', '')
self.node_name = data.get('node_name', '')
self.new_ip_address = data.get('new_ip_address', '')
self.status = data.get('status', False)
self.start_timestamp = data.get('start_timestamp', '')
self.end_timestamp = data.get('end_timestamp', '')
self.duration = data.get('duration', 0)
class TestVirtChecker(unittest.TestCase):
"""Test suite for VirtChecker class"""
def setUp(self):
"""Set up test fixtures before each test method"""
self.mock_krkn_lib = MagicMock()
# Mock VMI data
self.mock_vmi_1 = {
"metadata": {"name": "test-vm-1", "namespace": "test-namespace"},
"status": {
"nodeName": "worker-1",
"interfaces": [{"ipAddress": "192.168.1.10"}]
}
}
self.mock_vmi_2 = {
"metadata": {"name": "test-vm-2", "namespace": "test-namespace"},
"status": {
"nodeName": "worker-2",
"interfaces": [{"ipAddress": "192.168.1.11"}]
}
}
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_init_with_empty_namespace(self, mock_plugin_class, mock_yaml):
"""Test VirtChecker initialization with empty namespace (should skip checks)"""
def yaml_getter(config, key, default):
if key == "namespace":
return ""
return default
mock_yaml.side_effect = yaml_getter
checker = VirtChecker(
{"namespace": ""},
iterations=5,
krkn_lib=self.mock_krkn_lib
)
# Should set batch_size to 0 and not initialize plugin
self.assertEqual(checker.batch_size, 0)
mock_plugin_class.assert_not_called()
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_regex_namespace(self, mock_plugin_class, mock_yaml):
"""Test VirtChecker initialization with regex namespace pattern"""
# Setup mock plugin with VMI data
mock_plugin = MagicMock()
mock_plugin.vmis_list = [self.mock_vmi_1, self.mock_vmi_2]
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
return config.get(key, default)
mock_yaml.side_effect = yaml_getter
checker = VirtChecker(
{"namespace": "test-*"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
self.assertGreater(len(checker.vm_list), 0)
self.assertEqual(len(checker.vm_list), 2)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_with_node_name(self, mock_plugin_class, mock_yaml):
"""Test VirtChecker initialization with specific VM names"""
# Setup mock plugin with VMI data
mock_plugin = MagicMock()
mock_plugin.vmis_list = [self.mock_vmi_1, self.mock_vmi_2]
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
return config.get(key, default)
mock_yaml.side_effect = yaml_getter
# Test with VM name pattern
checker = VirtChecker(
{"namespace": "test-namespace", "name": "test-vm-.*"},
iterations=5,
krkn_lib=self.mock_krkn_lib
)
self.assertGreater(checker.batch_size, 0)
self.assertEqual(len(checker.vm_list), 2)
# Test with specific VM name
mock_plugin = MagicMock()
mock_plugin.vmis_list = [self.mock_vmi_2]
mock_plugin_class.return_value = mock_plugin
checker2 = VirtChecker(
{"namespace": "test-namespace", "name": "test-vm-1"},
iterations=5,
krkn_lib=self.mock_krkn_lib
)
self.assertGreater(checker2.batch_size, 0)
self.assertEqual(len(checker2.vm_list), 1)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_with_regex_name(self, mock_plugin_class, mock_yaml):
"""Test VirtChecker initialization filtering by node names"""
# Setup mock plugin with VMI data
mock_plugin = MagicMock()
mock_plugin.vmis_list = [self.mock_vmi_1, self.mock_vmi_2]
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
return config.get(key, default)
mock_yaml.side_effect = yaml_getter
# Test filtering by node name - should only include VMs on worker-2
checker = VirtChecker(
{"namespace": "test-namespace", "node_names": "worker-2"},
iterations=5,
krkn_lib=self.mock_krkn_lib
)
self.assertGreater(checker.batch_size, 0)
# Only test-vm-2 is on worker-2, so vm_list should have 1 VM
self.assertEqual(len(checker.vm_list), 1)
self.assertEqual(checker.vm_list[0].vm_name, "test-vm-2")
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.invoke_no_exit')
def test_get_vm_access_success(self, mock_invoke, mock_plugin_class, mock_yaml):
"""Test get_vm_access returns True when VM is accessible"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
# Mock successful access
mock_invoke.return_value = "True"
checker = VirtChecker(
{"namespace": "test-ns"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
result = checker.get_vm_access("test-vm", "test-namespace")
self.assertTrue(result)
# Should try first command and succeed
self.assertGreaterEqual(mock_invoke.call_count, 1)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.invoke_no_exit')
def test_get_vm_access_failure(self, mock_invoke, mock_plugin_class, mock_yaml):
"""Test get_vm_access returns False when VM is not accessible"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
# Mock failed access
mock_invoke.return_value = "False"
checker = VirtChecker(
{"namespace": "test-ns"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
result = checker.get_vm_access("test-vm", "test-namespace")
self.assertFalse(result)
# Should try both commands
self.assertEqual(mock_invoke.call_count, 2)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.invoke_no_exit')
def test_check_disconnected_access_success(self, mock_invoke, mock_plugin_class, mock_yaml):
"""Test check_disconnected_access with successful connection"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
# Mock successful disconnected access
mock_invoke.side_effect = ["some output", "True"]
checker = VirtChecker(
{"namespace": "test-ns"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
result, new_ip, new_node = checker.check_disconnected_access(
"192.168.1.10",
"worker-1",
"test-vm"
)
self.assertTrue(result)
self.assertIsNone(new_ip)
self.assertIsNone(new_node)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.invoke_no_exit')
def test_check_disconnected_access_with_new_ip(self, mock_invoke, mock_plugin_class, mock_yaml):
"""Test check_disconnected_access when VM has new IP address"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
# Mock failed first attempt, successful second with new IP
mock_invoke.side_effect = ["some output", "False", "True"]
mock_vmi = {
"status": {
"nodeName": "worker-1",
"interfaces": [{"ipAddress": "192.168.1.20"}]
}
}
mock_plugin.get_vmi = MagicMock(return_value=mock_vmi)
checker = VirtChecker(
{"namespace": "test-ns"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
checker.kube_vm_plugin = mock_plugin
result, new_ip, new_node = checker.check_disconnected_access(
"192.168.1.10",
"worker-1",
"test-vm"
)
self.assertTrue(result)
self.assertEqual(new_ip, "192.168.1.20")
self.assertIsNone(new_node)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.invoke_no_exit')
def test_check_disconnected_access_with_new_node(self, mock_invoke, mock_plugin_class, mock_yaml):
"""Test check_disconnected_access when VM moved to new node"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
# Mock failed attempts, successful on new node
# Call sequence: debug_check, initial_check, check_on_new_node
mock_invoke.side_effect = ["some output", "False", "True"]
mock_vmi = {
"status": {
"nodeName": "worker-2",
"interfaces": [{"ipAddress": "192.168.1.10"}]
}
}
mock_plugin.get_vmi = MagicMock(return_value=mock_vmi)
checker = VirtChecker(
{"namespace": "test-ns"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
checker.kube_vm_plugin = mock_plugin
result, new_ip, new_node = checker.check_disconnected_access(
"192.168.1.10",
"worker-1",
"test-vm"
)
self.assertTrue(result)
self.assertEqual(new_ip, "192.168.1.10")
self.assertEqual(new_node, "worker-2")
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.invoke_no_exit')
def test_check_disconnected_access_with_ssh_node_fallback(self, mock_invoke, mock_plugin_class, mock_yaml):
"""Test check_disconnected_access falls back to ssh_node"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
elif key == "ssh_node":
return "worker-0"
return default
mock_yaml.side_effect = yaml_getter
# Mock failed attempts on original node, successful on ssh_node fallback
# Call sequence: debug_check, initial_check_on_worker-1, fallback_check_on_ssh_node
# Since IP and node haven't changed, it goes directly to ssh_node fallback
mock_invoke.side_effect = ["some output", "False", "True"]
mock_vmi = {
"status": {
"nodeName": "worker-1",
"interfaces": [{"ipAddress": "192.168.1.10"}]
}
}
mock_plugin.get_vmi = MagicMock(return_value=mock_vmi)
checker = VirtChecker(
{"namespace": "test-ns", "ssh_node": "worker-0"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
checker.kube_vm_plugin = mock_plugin
result, new_ip, new_node = checker.check_disconnected_access(
"192.168.1.10",
"worker-1",
"test-vm"
)
self.assertTrue(result)
self.assertEqual(new_ip, "192.168.1.10")
self.assertIsNone(new_node)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_thread_join(self, mock_plugin_class, mock_yaml):
"""Test thread_join waits for all threads"""
mock_plugin = MagicMock()
mock_plugin.vmis_list = []
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
checker = VirtChecker(
{"namespace": "test-ns"},
iterations=1,
krkn_lib=self.mock_krkn_lib
)
# Create mock threads
mock_thread_1 = MagicMock()
mock_thread_2 = MagicMock()
checker.threads = [mock_thread_1, mock_thread_2]
checker.thread_join()
mock_thread_1.join.assert_called_once()
mock_thread_2.join.assert_called_once()
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_init_exception_handling(self, mock_plugin_class, mock_yaml):
"""Test VirtChecker handles exceptions during initialization"""
mock_plugin = MagicMock()
mock_plugin.init_clients.side_effect = Exception("Connection error")
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
return default
mock_yaml.side_effect = yaml_getter
config = {"namespace": "test-ns"}
# Should not raise exception
checker = VirtChecker(
config,
iterations=1,
krkn_lib=self.mock_krkn_lib
)
# VM list should be empty due to exception
self.assertEqual(len(checker.vm_list), 0)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
def test_batch_size_calculation(self, mock_plugin_class, mock_yaml):
"""Test batch size calculation based on VM count and thread limit"""
mock_plugin = MagicMock()
# Create 25 mock VMIs
mock_vmis = []
for i in range(25):
vmi = {
"metadata": {"name": f"vm-{i}", "namespace": "test-ns"},
"status": {
"nodeName": "worker-1",
"interfaces": [{"ipAddress": f"192.168.1.{i}"}]
}
}
mock_vmis.append(vmi)
mock_plugin.vmis_list = mock_vmis
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
elif key == "node_names":
return ""
return default
mock_yaml.side_effect = yaml_getter
config = {"namespace": "test-ns"}
checker = VirtChecker(
config,
iterations=5,
krkn_lib=self.mock_krkn_lib,
threads_limit=10
)
# 25 VMs / 10 threads = 3 VMs per batch (ceiling)
self.assertEqual(checker.batch_size, 3)
@patch('krkn_lib.models.telemetry.models.VirtCheck', new=MockVirtCheck)
@patch('krkn.utils.VirtChecker.get_yaml_item_value')
@patch('krkn.utils.VirtChecker.KubevirtVmOutageScenarioPlugin')
@patch('krkn.utils.VirtChecker.threading.Thread')
def test_batch_list_includes_last_item(self, mock_thread_class, mock_plugin_class, mock_yaml):
"""Test that batch_list includes the last item when batches don't divide evenly"""
mock_plugin = MagicMock()
# Create 21 mock VMIs (the specific case mentioned in the bug report)
mock_vmis = []
for i in range(21):
vmi = {
"metadata": {"name": f"vm-{i}", "namespace": "test-ns"},
"status": {
"nodeName": "worker-1",
"interfaces": [{"ipAddress": f"192.168.1.{i}"}]
}
}
mock_vmis.append(vmi)
mock_plugin.vmis_list = mock_vmis
mock_plugin_class.return_value = mock_plugin
def yaml_getter(config, key, default):
if key == "namespace":
return "test-ns"
elif key == "node_names":
return ""
return default
mock_yaml.side_effect = yaml_getter
config = {"namespace": "test-ns"}
checker = VirtChecker(
config,
iterations=5,
krkn_lib=self.mock_krkn_lib,
threads_limit=5 # This gives batch_size=5 (ceiling of 21/5=4.2)
)
# 21 VMs / 5 threads = 5 VMs per batch (ceiling)
self.assertEqual(checker.batch_size, 5)
self.assertEqual(len(checker.vm_list), 21)
# Track the sublists passed to each thread
captured_sublists = []
def capture_args(*args, **kwargs):
# threading.Thread is called with target=..., name=..., args=(sublist, queue)
if 'args' in kwargs:
sublist, queue = kwargs['args']
captured_sublists.append(sublist)
mock_thread = MagicMock()
if 'name' in kwargs:
mock_thread.name = kwargs['name']
return mock_thread
mock_thread_class.side_effect = capture_args
# Create a mock queue
mock_queue = MagicMock()
# Call batch_list
checker.batch_list(mock_queue)
# Verify all 21 items are included across all batches
all_items_in_batches = []
for sublist in captured_sublists:
all_items_in_batches.extend(sublist)
# Check that we have exactly 21 items
self.assertEqual(len(all_items_in_batches), 21)
# Verify the last batch includes the last item (vm-20)
last_batch = captured_sublists[-1]
self.assertGreater(len(last_batch), 0, "Last batch should not be empty")
# Verify no duplicate items across batches
all_vm_names = [vm.vm_name for vm in all_items_in_batches]
self.assertEqual(len(all_vm_names), len(set(all_vm_names)), "No duplicate items should be in batches")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Test suite for ZoneOutageScenarioPlugin class
Usage:
python -m coverage run -a -m unittest tests/test_zone_outage_scenario_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.zone_outage.zone_outage_scenario_plugin import ZoneOutageScenarioPlugin
class TestZoneOutageScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for ZoneOutageScenarioPlugin
"""
self.plugin = ZoneOutageScenarioPlugin()
def test_get_scenario_types(self):
"""
Test get_scenario_types returns correct scenario type
"""
result = self.plugin.get_scenario_types()
self.assertEqual(result, ["zone_outages_scenarios"])
self.assertEqual(len(result), 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -2,5 +2,5 @@ numpy
pandas
requests
Flask==2.2.5
Werkzeug==3.0.3
Werkzeug==3.0.6
flasgger==0.9.5