Compare commits

...

32 Commits

Author SHA1 Message Date
Paige Patton
4f250c9601 adding affected nodes to affectednodestatus (#884)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 4m20s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-08-13 20:54:13 -04:00
Paige Patton
6480adc00a adding setting own image for network chaos (#883)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 10m5s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-08-13 17:49:47 -04:00
Paige Patton
5002f210ae removing dashboard installation
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m9s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-08-05 11:27:41 -04:00
Paige Patton
62c5afa9a2 updated done items in roadmap
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m52s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-08-01 13:23:23 -04:00
Paige Patton
c109fc0b17 adding elastic installation into krkn tests
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 6m36s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-31 10:41:31 -04:00
Tullio Sebastiani
fff675f3dd added service account to Network Chaos NG workload (#870)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 8m56s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-07-23 10:17:50 +02:00
Naga Ravi Chaitanya Elluri
c125e5acf7 Update network scenario image
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m34s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
This commit updates fedora tools image reference used by the network scenarios
to the one hosted in the krkn-chaos quay org. This also fixes the issues with
RHACS flagging runs when using latest tag by using tools tag instead.

Signed-off-by: Naga Ravi Chaitanya Elluri <nelluri@redhat.com>
2025-07-22 14:29:00 -04:00
Naga Ravi Chaitanya Elluri
ca6995a1a1 [Snyk] Fix for 3 vulnerabilities (#859)
* fix: requirements.txt to reduce vulnerabilities


The following vulnerabilities are fixed by pinning transitive dependencies:
- https://snyk.io/vuln/SNYK-PYTHON-PROTOBUF-10364902
- https://snyk.io/vuln/SNYK-PYTHON-URLLIB3-10390193
- https://snyk.io/vuln/SNYK-PYTHON-URLLIB3-10390194

* partial vulnerability fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
Co-authored-by: snyk-bot <snyk-bot@snyk.io>
Co-authored-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-07-22 16:50:31 +02:00
Sahil Shah
50cf91ac9e Disable SSL verification for IBM node scenarios and fix node reboot s… (#861)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m9s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* Disable SSL verification for IBM node scenarios and fix node reboot scenario

Signed-off-by: Sahil Shah <sahshah@redhat.com>

* adding disable ssl as a scenario parameter for ibmcloud

Signed-off-by: Sahil Shah <sahshah@redhat.com>

---------

Signed-off-by: Sahil Shah <sahshah@redhat.com>
2025-07-16 12:48:45 -04:00
Tullio Sebastiani
11069c6982 added tolerations to node network filter pod deployment (#867) 2025-07-16 17:11:46 +02:00
Charles Uneze
106d9bf1ae A working kind config (#866)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 5m13s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Charles Uneze <charlesniklaus@gmail.com>
2025-07-15 10:25:01 -04:00
Abhinav Sharma
17f832637c feat: add optional node-name field to hog scenarios with precedence over node-selector (#831)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 8m31s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Co-authored-by: Paige Patton <64206430+paigerube14@users.noreply.github.com>
2025-07-11 14:10:16 -04:00
Paige Patton
0e5c8c55a4 adding details of node for hog failure
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 8m23s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-10 16:49:28 -04:00
Tullio Sebastiani
9d9a6f9b80 added missing parameters to node-network-filter + added default values
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-07-10 13:22:50 -04:00
Anshuman Panda
f8fe2ae5b7 Refactor: to use krkn-lib for getting and remove invoke funct. usage node IP
Signed-off-by: Anshuman Panda <ichuk0078@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-10 13:21:10 -04:00
Paige Patton
77b1dd32c7 adding kubevirt with pod timing
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-10 13:19:37 -04:00
Anshuman Panda
9df727ccf5 Ensure metrics are always saved with improved local fallback
Signed-off-by: Anshuman Panda <ichuk0078@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-10 13:19:07 -04:00
Tullio Sebastiani
70c8fec705 added pod-network-filter funtest (#863)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m37s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* added pod-network-filter funtest

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* updated kind settings

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-07-10 09:35:59 +02:00
Abhinav Sharma
0731144a6b Add support for triggering kubevirt VM outages (#816)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 10m2s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* add requirement for kubevirt_vm_outage

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* add initial init and kubevirt_plugin files

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* add scenario in  kubevirt-vm-outage.yaml

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* implement init, get_scenario_types, run and placeholder for inject and recover functions

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* implement init client, execute_scenario, validate environment, inject and get_VMinstance fucntions

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* implement recover function

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* implement recover function

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* add test for kubevirt_vm_outage feature

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* improve KubeVirt recovery logic and update dependencies, for kubevirt

Signed-off-by: Paige Patton <prubenda@redhat.com>

* refactor(kubevirt): use KrknKubernetes client for KubeVirt operations

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* chore: Add auto-restart disable option and simplify code

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* chore: remove kubevirt external package used.

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* adding few changes and scenario in config file

Signed-off-by: Paige Patton <prubenda@redhat.com>

* removing docs

Signed-off-by: Paige Patton <prubenda@redhat.com>

* no affected pods

Signed-off-by: Paige Patton <prubenda@redhat.com>

---------

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
Co-authored-by: Paige Patton <prubenda@redhat.com>
2025-07-08 14:04:57 -04:00
yogananth subramanian
9337052e7b Fix bm_node_scenarios.py
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 10m29s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Fix the logic in disk disruption scenario, which was returning the right set of disks to be off-lined.

Signed-off-by: Yogananth Subramanian <ysubrama@redhat.com>
2025-07-07 13:49:33 -04:00
yogananth subramanian
dc8d7ad75b Add disk detach/attach scenario to baremetal node actions (#855)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Has been cancelled
Functional & Unit Tests / Generate Coverage Badge (push) Has been cancelled
- Implemented methods for detaching and attaching disks to baremetal nodes.
- Added a new scenario `node_disk_detach_attach_scenario` to manage disk operations.
- Updated the YAML configuration to include the new scenario with disk details.

Signed-off-by: Yogananth Subramanian <ysubrama@redhat.com>
2025-07-03 17:18:57 +02:00
Paige Patton
1cc44e1f18 adding non native verison of pod scenarios (#847)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-03 15:46:13 +02:00
Paige Patton
c8190fd1c1 adding pod test (#858)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-03 15:00:51 +02:00
Tullio Sebastiani
9078b35e46 updated krkn-lib
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-07-02 17:30:58 +02:00
Tullio Sebastiani
e6b1665aa1 added toleration to schedule pod on master
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-06-30 10:30:47 +02:00
Tullio Sebastiani
c56819365c minor nits fixes
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-06-27 15:12:45 +02:00
Tullio Sebastiani
6a657576cb api refactoring + pod network filter scenario
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-06-26 15:51:35 +02:00
Tullio Sebastiani
f04f1f1101 added workload image as scenario parameter (#854)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 8m58s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* added workload image as scenario parameter

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* renamed workload_image to image

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-06-25 17:08:59 +02:00
Naga Ravi Chaitanya Elluri
bddbd42f8c Expose kube_check parameter for baremetal node scenarios
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 10m7s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Naga Ravi Chaitanya Elluri <nelluri@redhat.com>
2025-06-16 11:43:32 -04:00
dependabot[bot]
630dbd805b Bump requests from 2.32.2 to 2.32.4
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m38s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Bumps [requests](https://github.com/psf/requests) from 2.32.2 to 2.32.4.
- [Release notes](https://github.com/psf/requests/releases)
- [Changelog](https://github.com/psf/requests/blob/main/HISTORY.md)
- [Commits](https://github.com/psf/requests/compare/v2.32.2...v2.32.4)

---
updated-dependencies:
- dependency-name: requests
  dependency-version: 2.32.4
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-06-11 12:54:11 -04:00
Paige Patton
10d26ba50e adding kube check into gcp zone'
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-06-11 12:53:47 -04:00
Naga Ravi Chaitanya Elluri
d47286ae21 Expose parallel option in the baremetal node scenarios
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m14s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Naga Ravi Chaitanya Elluri <nelluri@redhat.com>
2025-06-09 09:48:04 -04:00
70 changed files with 1930 additions and 2470 deletions

View File

@@ -14,34 +14,16 @@ jobs:
uses: actions/checkout@v3
- name: Create multi-node KinD cluster
uses: redhat-chaos/actions/kind@main
- name: Install Helm & add repos
run: |
curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo add stable https://charts.helm.sh/stable
helm repo update
- name: Deploy prometheus & Port Forwarding
run: |
kubectl create namespace prometheus-k8s
helm install \
--wait --timeout 360s \
kind-prometheus \
prometheus-community/kube-prometheus-stack \
--namespace prometheus-k8s \
--set prometheus.service.nodePort=30000 \
--set prometheus.service.type=NodePort \
--set grafana.service.nodePort=31000 \
--set grafana.service.type=NodePort \
--set alertmanager.service.nodePort=32000 \
--set alertmanager.service.type=NodePort \
--set prometheus-node-exporter.service.nodePort=32001 \
--set prometheus-node-exporter.service.type=NodePort \
--set prometheus.prometheusSpec.maximumStartupDurationSeconds=300
SELECTOR=`kubectl -n prometheus-k8s get service kind-prometheus-kube-prome-prometheus -o wide --no-headers=true | awk '{ print $7 }'`
POD_NAME=`kubectl -n prometheus-k8s get pods --selector="$SELECTOR" --no-headers=true | awk '{ print $1 }'`
kubectl -n prometheus-k8s port-forward $POD_NAME 9090:9090 &
sleep 5
uses: redhat-chaos/actions/prometheus@main
- name: Deploy Elasticsearch
with:
ELASTIC_URL: ${{ vars.ELASTIC_URL }}
ELASTIC_PORT: ${{ vars.ELASTIC_PORT }}
ELASTIC_USER: ${{ vars.ELASTIC_USER }}
ELASTIC_PASSWORD: ${{ vars.ELASTIC_PASSWORD }}
uses: redhat-chaos/actions/elastic@main
- name: Install Python
uses: actions/setup-python@v4
with:
@@ -55,6 +37,8 @@ jobs:
- name: Deploy test workloads
run: |
es_pod_name=$(kubectl get pods -l "app.kubernetes.io/instance=elasticsearch" -o name)
kubectl --namespace default port-forward $es_pod_name 9200 &
kubectl apply -f CI/templates/outage_pod.yaml
kubectl wait --for=condition=ready pod -l scenario=outage --timeout=300s
kubectl apply -f CI/templates/container_scenario_pod.yaml
@@ -79,15 +63,20 @@ jobs:
yq -i '.kraken.port="8081"' CI/config/common_test_config.yaml
yq -i '.kraken.signal_address="0.0.0.0"' CI/config/common_test_config.yaml
yq -i '.kraken.performance_monitoring="localhost:9090"' CI/config/common_test_config.yaml
yq -i '.elastic.elastic_port=9200' CI/config/common_test_config.yaml
yq -i '.elastic.elastic_url="https://localhost"' CI/config/common_test_config.yaml
yq -i '.elastic.enable_elastic=True' CI/config/common_test_config.yaml
echo "test_service_hijacking" > ./CI/tests/functional_tests
echo "test_app_outages" >> ./CI/tests/functional_tests
echo "test_container" >> ./CI/tests/functional_tests
echo "test_pod" >> ./CI/tests/functional_tests
echo "test_namespace" >> ./CI/tests/functional_tests
echo "test_net_chaos" >> ./CI/tests/functional_tests
echo "test_time" >> ./CI/tests/functional_tests
echo "test_cpu_hog" >> ./CI/tests/functional_tests
echo "test_memory_hog" >> ./CI/tests/functional_tests
echo "test_io_hog" >> ./CI/tests/functional_tests
echo "test_pod_network_filter" >> ./CI/tests/functional_tests
# Push on main only steps + all other functional to collect coverage
@@ -105,18 +94,22 @@ jobs:
yq -i '.kraken.port="8081"' CI/config/common_test_config.yaml
yq -i '.kraken.signal_address="0.0.0.0"' CI/config/common_test_config.yaml
yq -i '.kraken.performance_monitoring="localhost:9090"' CI/config/common_test_config.yaml
yq -i '.elastic.elastic_port=9200' CI/config/common_test_config.yaml
yq -i '.elastic.elastic_url="https://localhost"' CI/config/common_test_config.yaml
yq -i '.telemetry.username="${{secrets.TELEMETRY_USERNAME}}"' CI/config/common_test_config.yaml
yq -i '.telemetry.password="${{secrets.TELEMETRY_PASSWORD}}"' CI/config/common_test_config.yaml
echo "test_telemetry" > ./CI/tests/functional_tests
echo "test_service_hijacking" >> ./CI/tests/functional_tests
echo "test_app_outages" >> ./CI/tests/functional_tests
echo "test_container" >> ./CI/tests/functional_tests
echo "test_pod" >> ./CI/tests/functional_tests
echo "test_namespace" >> ./CI/tests/functional_tests
echo "test_net_chaos" >> ./CI/tests/functional_tests
echo "test_time" >> ./CI/tests/functional_tests
echo "test_cpu_hog" >> ./CI/tests/functional_tests
echo "test_memory_hog" >> ./CI/tests/functional_tests
echo "test_io_hog" >> ./CI/tests/functional_tests
echo "test_pod_network_filter" >> ./CI/tests/functional_tests
# Final common steps
- name: Run Functional tests
@@ -127,20 +120,24 @@ jobs:
cat ./CI/results.markdown >> $GITHUB_STEP_SUMMARY
echo >> $GITHUB_STEP_SUMMARY
- name: Upload CI logs
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v4
with:
name: ci-logs
path: CI/out
if-no-files-found: error
- name: Collect coverage report
if: ${{ success() || failure() }}
run: |
python -m coverage html
python -m coverage json
- name: Publish coverage report to job summary
if: ${{ success() || failure() }}
run: |
pip install html2text
html2text --ignore-images --ignore-links -b 0 htmlcov/index.html >> $GITHUB_STEP_SUMMARY
- name: Upload coverage data
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v4
with:
name: coverage

View File

@@ -2,19 +2,14 @@ kraken:
distribution: kubernetes # Distribution can be kubernetes or openshift.
kubeconfig_path: ~/.kube/config # Path to kubeconfig.
exit_on_failure: False # Exit when a post action scenario fails.
litmus_version: v1.13.6 # Litmus version to install.
litmus_uninstall: False # If you want to uninstall litmus if failure.
chaos_scenarios: # List of policies/chaos scenarios to load.
- $scenario_type: # List of chaos pod scenarios to load.
- $scenario_file
$post_config
cerberus:
cerberus_enabled: False # Enable it when cerberus is previously installed.
cerberus_url: # When cerberus_enabled is set to True, provide the url where cerberus publishes go/no-go signal.
performance_monitoring:
deploy_dashboards: False # Install a mutable grafana and load the performance dashboards. Enable this only when running on OpenShift.
repo: "https://github.com/cloud-bulldozer/performance-dashboards.git"
capture_metrics: False
metrics_profile_path: config/metrics-aggregated.yaml
prometheus_url: # The prometheus url/route is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes.
@@ -52,8 +47,6 @@ telemetry:
telemetry_group: "funtests"
elastic:
enable_elastic: False
collect_metrics: False
collect_alerts: False
verify_certs: False
elastic_url: "https://192.168.39.196" # To track results in elasticsearch, give url to server here; will post telemetry details when url and index not blank
elastic_port: 32766

View File

@@ -8,9 +8,9 @@ spec:
hostNetwork: true
containers:
- name: fedtools
image: docker.io/fedora/tools
image: quay.io/krkn-chaos/krkn:tools
command:
- /bin/sh
- -c
- |
sleep infinity
sleep infinity

View File

@@ -0,0 +1,29 @@
apiVersion: v1
kind: Pod
metadata:
name: pod-network-filter-test
labels:
app.kubernetes.io/name: pod-network-filter
spec:
containers:
- name: nginx
image: quay.io/krkn-chaos/krkn-funtests:pod-network-filter
ports:
- containerPort: 5000
name: pod-network-prt
---
apiVersion: v1
kind: Service
metadata:
name: pod-network-filter-service
spec:
selector:
app.kubernetes.io/name: pod-network-filter
type: NodePort
ports:
- name: pod-network-filter-svc
protocol: TCP
port: 80
targetPort: pod-network-prt
nodePort: 30037

View File

@@ -8,9 +8,9 @@ spec:
hostNetwork: true
containers:
- name: fedtools
image: docker.io/fedora/tools
image: quay.io/krkn-chaos/krkn:tools
command:
- /bin/sh
- -c
- |
sleep infinity
sleep infinity

View File

@@ -13,6 +13,10 @@ function functional_test_app_outage {
export scenario_type="application_outages_scenarios"
export scenario_file="scenarios/openshift/app_outage.yaml"
export post_config=""
kubectl get services -A
kubectl get pods
envsubst < CI/config/common_test_config.yaml > CI/config/app_outage.yaml
python3 -m coverage run -a run_kraken.py -c CI/config/app_outage.yaml
echo "App outage scenario test: Success"

View File

@@ -7,7 +7,7 @@ trap finish EXIT
function functional_test_cpu_hog {
yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/cpu-hog.yml
yq -i '."node-selector"="kubernetes.io/hostname=kind-worker2"' scenarios/kube/cpu-hog.yml
export scenario_type="hog_scenarios"
export scenario_file="scenarios/kube/cpu-hog.yml"

View File

@@ -5,12 +5,13 @@ source CI/tests/common.sh
trap error ERR
trap finish EXIT
function functional_test_io_hog {
yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/io-hog.yml
yq -i '."node-selector"="kubernetes.io/hostname=kind-worker2"' scenarios/kube/io-hog.yml
export scenario_type="hog_scenarios"
export scenario_file="scenarios/kube/io-hog.yml"
export post_config=""
cat $scenario_file
envsubst < CI/config/common_test_config.yaml > CI/config/io_hog.yaml
python3 -m coverage run -a run_kraken.py -c CI/config/io_hog.yaml
echo "IO Hog: Success"

View File

@@ -7,7 +7,7 @@ trap finish EXIT
function functional_test_memory_hog {
yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/memory-hog.yml
yq -i '."node-selector"="kubernetes.io/hostname=kind-worker2"' scenarios/kube/memory-hog.yml
export scenario_type="hog_scenarios"
export scenario_file="scenarios/kube/memory-hog.yml"
export post_config=""

18
CI/tests/test_pod.sh Executable file
View File

@@ -0,0 +1,18 @@
set -xeEo pipefail
source CI/tests/common.sh
trap error ERR
trap finish EXIT
function functional_test_pod_crash {
export scenario_type="pod_disruption_scenarios"
export scenario_file="scenarios/kind/pod_etcd.yml"
export post_config=""
envsubst < CI/config/common_test_config.yaml > CI/config/pod_config.yaml
python3 -m coverage run -a run_kraken.py -c CI/config/pod_config.yaml
echo "Pod disruption scenario test: Success"
}
functional_test_pod_crash

View File

@@ -0,0 +1,59 @@
function functional_pod_network_filter {
export SERVICE_URL="http://localhost:8889"
export scenario_type="network_chaos_ng_scenarios"
export scenario_file="scenarios/kube/pod-network-filter.yml"
export post_config=""
envsubst < CI/config/common_test_config.yaml > CI/config/pod_network_filter.yaml
yq -i '.[0].test_duration=10' scenarios/kube/pod-network-filter.yml
yq -i '.[0].label_selector=""' scenarios/kube/pod-network-filter.yml
yq -i '.[0].ingress=false' scenarios/kube/pod-network-filter.yml
yq -i '.[0].egress=true' scenarios/kube/pod-network-filter.yml
yq -i '.[0].target="pod-network-filter-test"' scenarios/kube/pod-network-filter.yml
yq -i '.[0].protocols=["tcp"]' scenarios/kube/pod-network-filter.yml
yq -i '.[0].ports=[443]' scenarios/kube/pod-network-filter.yml
## Test webservice deployment
kubectl apply -f ./CI/templates/pod_network_filter.yaml
COUNTER=0
while true
do
curl $SERVICE_URL
EXITSTATUS=$?
if [ "$EXITSTATUS" -eq "0" ]
then
break
fi
sleep 1
COUNTER=$((COUNTER+1))
[ $COUNTER -eq "100" ] && echo "maximum number of retry reached, test failed" && exit 1
done
python3 -m coverage run -a run_kraken.py -c CI/config/pod_network_filter.yaml > /dev/null 2>&1 &
PID=$!
# wait until the dns resolution starts failing and the service returns 400
DNS_FAILURE_STATUS=0
while true
do
OUT_STATUS_CODE=$(curl -X GET -s -o /dev/null -I -w "%{http_code}" $SERVICE_URL)
if [ "$OUT_STATUS_CODE" -eq "404" ]
then
DNS_FAILURE_STATUS=404
fi
if [ "$DNS_FAILURE_STATUS" -eq "404" ] && [ "$OUT_STATUS_CODE" -eq "200" ]
then
echo "service restored"
break
fi
COUNTER=$((COUNTER+1))
[ $COUNTER -eq "100" ] && echo "maximum number of retry reached, test failed" && exit 1
sleep 2
done
wait $PID
}
functional_pod_network_filter

View File

@@ -2,11 +2,11 @@
Following are a list of enhancements that we are planning to work on adding support in Krkn. Of course any help/contributions are greatly appreciated.
- [ ] [Ability to run multiple chaos scenarios in parallel under load to mimic real world outages](https://github.com/krkn-chaos/krkn/issues/424)
- [x] [Ability to run multiple chaos scenarios in parallel under load to mimic real world outages](https://github.com/krkn-chaos/krkn/issues/424)
- [x] [Centralized storage for chaos experiments artifacts](https://github.com/krkn-chaos/krkn/issues/423)
- [ ] [Support for causing DNS outages](https://github.com/krkn-chaos/krkn/issues/394)
- [x] [Support for causing DNS outages](https://github.com/krkn-chaos/krkn/issues/394)
- [x] [Chaos recommender](https://github.com/krkn-chaos/krkn/tree/main/utils/chaos-recommender) to suggest scenarios having probability of impacting the service under test using profiling results
- [] Chaos AI integration to improve test coverage while reducing fault space to save costs and execution time
- [x] Chaos AI integration to improve test coverage while reducing fault space to save costs and execution time [krkn-chaos-ai](https://github.com/krkn-chaos/krkn-chaos-ai)
- [x] [Support for pod level network traffic shaping](https://github.com/krkn-chaos/krkn/issues/393)
- [ ] [Ability to visualize the metrics that are being captured by Kraken and stored in Elasticsearch](https://github.com/krkn-chaos/krkn/issues/124)
- [x] Support for running all the scenarios of Kraken on Kubernetes distribution - see https://github.com/krkn-chaos/krkn/issues/185, https://github.com/redhat-chaos/krkn/issues/186
@@ -14,3 +14,7 @@ Following are a list of enhancements that we are planning to work on adding supp
- [x] [Switch documentation references to Kubernetes](https://github.com/krkn-chaos/krkn/issues/495)
- [x] [OCP and Kubernetes functionalities segregation](https://github.com/krkn-chaos/krkn/issues/497)
- [x] [Krknctl - client for running Krkn scenarios with ease](https://github.com/krkn-chaos/krknctl)
- [x] [AI Chat bot to help get started with Krkn and commands](https://github.com/krkn-chaos/krkn-lightspeed)
- [ ] [Ability to roll back cluster to original state if chaos fails](https://github.com/krkn-chaos/krkn/issues/804)
- [ ] Add recovery time metrics to each scenario for each better regression analysis
- [ ] [Add resiliency scoring to chaos scenarios ran on cluster](https://github.com/krkn-chaos/krkn/issues/125)

View File

@@ -46,7 +46,10 @@ kraken:
- syn_flood_scenarios:
- scenarios/kube/syn_flood.yaml
- network_chaos_ng_scenarios:
- scenarios/kube/network-filter.yml
- scenarios/kube/pod-network-filter.yml
- scenarios/kube/node-network-filter.yml
- kubevirt_vm_outage:
- scenarios/kubevirt/kubevirt-vm-outage.yaml
cerberus:
cerberus_enabled: False # Enable it when cerberus is previously installed
@@ -54,9 +57,7 @@ cerberus:
check_applicaton_routes: False # When enabled will look for application unavailability using the routes specified in the cerberus config and fails the run
performance_monitoring:
deploy_dashboards: False # Install a mutable grafana and load the performance dashboards. Enable this only when running on OpenShift
repo: "https://github.com/cloud-bulldozer/performance-dashboards.git"
prometheus_url: '' # The prometheus url/route is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes.
prometheus_url: '' # The prometheus url/route is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes.
prometheus_bearer_token: # The bearer token is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes. This is needed to authenticate with prometheus.
uuid: # uuid for the run is generated by default if not set
enable_alerts: False # Runs the queries specified in the alert profile and displays the info or exits 1 when severity=error

View File

@@ -7,10 +7,8 @@ kraken:
signal_state: RUN # Will wait for the RUN signal when set to PAUSE before running the scenarios, refer docs/signal.md for more details
signal_address: 0.0.0.0 # Signal listening address
chaos_scenarios: # List of policies/chaos scenarios to load
- plugin_scenarios:
- scenarios/kind/scheduler.yml
- node_scenarios:
- scenarios/kind/node_scenarios_example.yml
- pod_disruption_scenarios:
- scenarios/kube/pod.yml
cerberus:
cerberus_enabled: False # Enable it when cerberus is previously installed
@@ -18,15 +16,24 @@ cerberus:
check_applicaton_routes: False # When enabled will look for application unavailability using the routes specified in the cerberus config and fails the run
performance_monitoring:
deploy_dashboards: False # Install a mutable grafana and load the performance dashboards. Enable this only when running on OpenShift
repo: "https://github.com/cloud-bulldozer/performance-dashboards.git"
prometheus_url: # The prometheus url/route is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes.
prometheus_bearer_token: # The bearer token is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes. This is needed to authenticate with prometheus.
uuid: # uuid for the run is generated by default if not set
enable_alerts: False # Runs the queries specified in the alert profile and displays the info or exits 1 when severity=error
alert_profile: config/alerts.yaml # Path to alert profile with the prometheus queries
elastic:
enable_elastic: False
tunings:
wait_duration: 60 # Duration to wait between each chaos scenario
iterations: 1 # Number of times to execute the scenarios
daemon_mode: False # Iterations are set to infinity which means that the kraken will cause chaos forever
telemetry:
enabled: False # enable/disables the telemetry collection feature
archive_path: /tmp # local path where the archive files will be temporarly stored
events_backup: False # enables/disables cluster events collection
logs_backup: False
health_checks: # Utilizing health check endpoints to observe application behavior during chaos injection.

View File

@@ -17,8 +17,6 @@ cerberus:
check_applicaton_routes: False # When enabled will look for application unavailability using the routes specified in the cerberus config and fails the run
performance_monitoring:
deploy_dashboards: False # Install a mutable grafana and load the performance dashboards. Enable this only when running on OpenShift
repo: "https://github.com/cloud-bulldozer/performance-dashboards.git"
prometheus_url: # The prometheus url/route is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes.
prometheus_bearer_token: # The bearer token is automatically obtained in case of OpenShift, please set it when the distribution is Kubernetes. This is needed to authenticate with prometheus.
uuid: # uuid for the run is generated by default if not set

View File

@@ -10,7 +10,7 @@ RUN go mod edit -go 1.23.1 &&\
go get github.com/docker/docker@v25.0.6&&\
go get github.com/opencontainers/runc@v1.1.14&&\
go get github.com/go-git/go-git/v5@v5.13.0&&\
go get golang.org/x/net@v0.36.0&&\
go get golang.org/x/net@v0.38.0&&\
go get github.com/containerd/containerd@v1.7.27&&\
go get golang.org/x/oauth2@v0.27.0&&\
go get golang.org/x/crypto@v0.35.0&&\
@@ -47,7 +47,7 @@ RUN if [ -n "$PR_NUMBER" ]; then git fetch origin pull/${PR_NUMBER}/head:pr-${PR
RUN if [ -n "$TAG" ]; then git checkout "$TAG";fi
RUN python3.9 -m ensurepip --upgrade --default-pip
RUN python3.9 -m pip install --upgrade pip setuptools==70.0.0
RUN python3.9 -m pip install --upgrade pip setuptools==78.1.1
RUN pip3.9 install -r requirements.txt
RUN pip3.9 install jsonschema

View File

@@ -5,6 +5,8 @@ nodes:
extraPortMappings:
- containerPort: 30036
hostPort: 8888
- containerPort: 30037
hostPort: 8889
- role: control-plane
- role: control-plane
- role: worker

View File

@@ -19,7 +19,6 @@ def invoke_no_exit(command, timeout=None):
output = ""
try:
output = subprocess.check_output(command, shell=True, universal_newlines=True, timeout=timeout)
logging.info("output " + str(output))
except Exception as e:
logging.error("Failed to run %s, error: %s" % (command, e))
return str(e)

View File

@@ -1,28 +0,0 @@
import subprocess
import logging
import git
import sys
# Installs a mutable grafana on the Kubernetes/OpenShift cluster and loads the performance dashboards
def setup(repo, distribution):
if distribution == "kubernetes":
command = "cd performance-dashboards/dittybopper && ./k8s-deploy.sh"
elif distribution == "openshift":
command = "cd performance-dashboards/dittybopper && ./deploy.sh"
else:
logging.error("Provided distribution: %s is not supported" % (distribution))
sys.exit(1)
delete_repo = "rm -rf performance-dashboards || exit 0"
logging.info(
"Cloning, installing mutable grafana on the cluster and loading the dashboards"
)
try:
# delete repo to clone the latest copy if exists
subprocess.run(delete_repo, shell=True, universal_newlines=True, timeout=45)
# clone the repo
git.Repo.clone_from(repo, "performance-dashboards")
# deploy performance dashboards
subprocess.run(command, shell=True, universal_newlines=True)
except Exception as e:
logging.error("Failed to install performance-dashboards, error: %s" % (e))

View File

@@ -9,6 +9,7 @@ import logging
import urllib3
import sys
import json
import tempfile
import yaml
from krkn_lib.elastic.krkn_elastic import KrknElastic
@@ -251,11 +252,29 @@ def metrics(
metric[k] = v
metric['timestamp'] = str(datetime.datetime.now())
metrics_list.append(metric.copy())
if elastic:
save_metrics = False
if elastic is not None and elastic_metrics_index is not None:
result = elastic.upload_metrics_to_elasticsearch(
run_uuid=run_uuid, index=elastic_metrics_index, raw_data=metrics_list
)
if result == -1:
logging.error("failed to save metrics on ElasticSearch")
save_metrics = True
else:
save_metrics = True
if save_metrics:
local_dir = os.path.join(tempfile.gettempdir(), "krkn_metrics")
os.makedirs(local_dir, exist_ok=True)
local_file = os.path.join(local_dir, f"{elastic_metrics_index}_{run_uuid}.json")
try:
with open(local_file, "w") as f:
json.dump({
"run_uuid": run_uuid,
"metrics": metrics_list
}, f, indent=2)
logging.info(f"Metrics saved to {local_file}")
except Exception as e:
logging.error(f"Failed to save metrics to {local_file}: {e}")
return metrics_list

View File

@@ -117,3 +117,5 @@ class AbstractScenarioPlugin(ABC):
logging.info(f"wating {wait_duration} before running the next scenario")
time.sleep(wait_duration)
return failed_scenarios, scenario_telemetries

View File

@@ -9,7 +9,7 @@ from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_yaml_item_value
from krkn import cerberus
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
@@ -44,7 +44,6 @@ class ContainerScenarioPlugin(AbstractScenarioPlugin):
return 1
scenario_telemetry.affected_pods = result
# publish cerberus status
except (RuntimeError, Exception):
logging.error("ContainerScenarioPlugin exiting due to Exception %s")
return 1
@@ -63,6 +62,7 @@ class ContainerScenarioPlugin(AbstractScenarioPlugin):
namespace_pattern=namespace_pattern,
label_selector=label_selector,
max_timeout=recovery_time,
field_selector="status.phase=Running"
)
def container_killing_in_pod(self, cont_scenario, kubecli: KrknKubernetes):

View File

@@ -25,6 +25,10 @@ class HogsScenarioPlugin(AbstractScenarioPlugin):
with open(scenario, "r") as f:
scenario = yaml.full_load(f)
scenario_config = HogConfig.from_yaml_dict(scenario)
# Get node-name if provided
node_name = scenario.get('node-name')
has_selector = True
if not scenario_config.node_selector or not re.match("^.+=.*$", scenario_config.node_selector):
if scenario_config.node_selector:
@@ -33,13 +37,19 @@ class HogsScenarioPlugin(AbstractScenarioPlugin):
else:
node_selector = scenario_config.node_selector
available_nodes = lib_telemetry.get_lib_kubernetes().list_nodes(node_selector)
if len(available_nodes) == 0:
raise Exception("no available nodes to schedule workload")
if node_name:
logging.info(f"Using specific node: {node_name}")
all_nodes = lib_telemetry.get_lib_kubernetes().list_nodes("")
if node_name not in all_nodes:
raise Exception(f"Specified node {node_name} not found or not available")
available_nodes = [node_name]
else:
available_nodes = lib_telemetry.get_lib_kubernetes().list_nodes(node_selector)
if len(available_nodes) == 0:
raise Exception("no available nodes to schedule workload")
if not has_selector:
# if selector not specified picks a random node between the available
available_nodes = [available_nodes[random.randint(0, len(available_nodes))]]
if not has_selector:
available_nodes = [available_nodes[random.randint(0, len(available_nodes))]]
if scenario_config.number_of_nodes and len(available_nodes) > scenario_config.number_of_nodes:
available_nodes = random.sample(available_nodes, scenario_config.number_of_nodes)

View File

@@ -0,0 +1,399 @@
import logging
import time
from typing import Dict, Any, Optional
import random
import re
import yaml
from kubernetes.client.rest import ApiException
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import log_exception
from krkn_lib.models.k8s import AffectedPod, PodsStatus
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
"""
A scenario plugin that injects chaos by deleting a KubeVirt Virtual Machine Instance (VMI).
This plugin simulates a VM crash or outage scenario and supports automated or manual recovery.
"""
def __init__(self):
self.k8s_client = None
self.original_vmi = None
# Scenario type is handled directly in execute_scenario
def get_scenario_types(self) -> list[str]:
return ["kubevirt_vm_outage"]
def run(
self,
run_uuid: str,
scenario: str,
krkn_config: dict[str, any],
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
) -> int:
"""
Main entry point for the plugin.
Parses the scenario configuration and executes the chaos scenario.
"""
try:
with open(scenario, "r") as f:
scenario_config = yaml.full_load(f)
self.init_clients(lib_telemetry.get_lib_kubernetes())
pods_status = PodsStatus()
for config in scenario_config["scenarios"]:
if config.get("scenario") == "kubevirt_vm_outage":
single_pods_status = self.execute_scenario(config, scenario_telemetry)
pods_status.merge(single_pods_status)
scenario_telemetry.affected_pods = pods_status
return 0
except Exception as e:
logging.error(f"KubeVirt VM Outage scenario failed: {e}")
log_exception(e)
return 1
def init_clients(self, k8s_client: KrknKubernetes):
"""
Initialize Kubernetes client for KubeVirt operations.
"""
self.k8s_client = k8s_client
self.custom_object_client = k8s_client.custom_object_client
logging.info("Successfully initialized Kubernetes client for KubeVirt operations")
def get_vmi(self, name: str, namespace: str) -> Optional[Dict]:
"""
Get a Virtual Machine Instance by name and namespace.
:param name: Name of the VMI to retrieve
:param namespace: Namespace of the VMI
:return: The VMI object if found, None otherwise
"""
try:
vmi = self.custom_object_client.get_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
name=name
)
return vmi
except ApiException as e:
if e.status == 404:
logging.warning(f"VMI {name} not found in namespace {namespace}")
return None
else:
logging.error(f"Error getting VMI {name}: {e}")
raise
except Exception as e:
logging.error(f"Unexpected error getting VMI {name}: {e}")
raise
def get_vmis(self, regex_name: str, namespace: str) -> Optional[Dict]:
"""
Get a Virtual Machine Instance by name and namespace.
:param name: Name of the VMI to retrieve
:param namespace: Namespace of the VMI
:return: The VMI object if found, None otherwise
"""
try:
vmis = self.custom_object_client.list_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
)
vmi_list = []
for vmi in vmis.get("items"):
vmi_name = vmi.get("metadata",{}).get("name")
match = re.match(regex_name, vmi_name)
if match:
vmi_list.append(vmi)
return vmi_list
except ApiException as e:
if e.status == 404:
logging.warning(f"VMI {regex_name} not found in namespace {namespace}")
return None
else:
logging.error(f"Error getting VMI {regex_name}: {e}")
raise
except Exception as e:
logging.error(f"Unexpected error getting VMI {regex_name}: {e}")
raise
def execute_scenario(self, config: Dict[str, Any], scenario_telemetry: ScenarioTelemetry) -> int:
"""
Execute a KubeVirt VM outage scenario based on the provided configuration.
:param config: The scenario configuration
:param scenario_telemetry: The telemetry object for recording metrics
:return: 0 for success, 1 for failure
"""
try:
params = config.get("parameters", {})
vm_name = params.get("vm_name")
namespace = params.get("namespace", "default")
timeout = params.get("timeout", 60)
kill_count = params.get("kill_count", 1)
disable_auto_restart = params.get("disable_auto_restart", False)
self.pods_status = PodsStatus()
if not vm_name:
logging.error("vm_name parameter is required")
return 1
vmis_list = self.get_vmis(vm_name,namespace)
rand_int = random.randint(0, len(vmis_list) - 1)
vmi = vmis_list[rand_int]
logging.info(f"Starting KubeVirt VM outage scenario for VM: {vm_name} in namespace: {namespace}")
vmi_name = vmi.get("metadata").get("name")
if not self.validate_environment(vmi_name, namespace):
return 1
vmi = self.get_vmi(vmi_name, namespace)
self.affected_pod = AffectedPod(
pod_name=vmi_name,
namespace=namespace,
)
if not vmi:
logging.error(f"VMI {vm_name} not found in namespace {namespace}")
return 1
self.original_vmi = vmi
logging.info(f"Captured initial state of VMI: {vm_name}")
result = self.delete_vmi(vmi_name, namespace, disable_auto_restart)
if result != 0:
return self.pods_status
result = self.wait_for_running(vmi_name,namespace, timeout)
if result != 0:
self.recover(vmi_name, namespace)
self.pods_status.unrecovered = self.affected_pod
return self.pods_status
self.affected_pod.total_recovery_time = (
self.affected_pod.pod_readiness_time
+ self.affected_pod.pod_rescheduling_time
)
self.pods_status.recovered.append(self.affected_pod)
logging.info(f"Successfully completed KubeVirt VM outage scenario for VM: {vm_name}")
return self.pods_status
except Exception as e:
logging.error(f"Error executing KubeVirt VM outage scenario: {e}")
log_exception(e)
return 1
def validate_environment(self, vm_name: str, namespace: str) -> bool:
"""
Validate that KubeVirt is installed and the specified VM exists.
:param vm_name: Name of the VM to validate
:param namespace: Namespace of the VM
:return: True if environment is valid, False otherwise
"""
try:
# Check if KubeVirt CRDs exist
crd_list = self.custom_object_client.list_namespaced_custom_object("kubevirt.io","v1",namespace,"virtualmachines")
kubevirt_crds = [crd for crd in crd_list.items() ]
if not kubevirt_crds:
logging.error("KubeVirt CRDs not found. Ensure KubeVirt/CNV is installed in the cluster")
return False
# Check if VMI exists
vmi = self.get_vmi(vm_name, namespace)
if not vmi:
logging.error(f"VMI {vm_name} not found in namespace {namespace}")
return False
logging.info(f"Validated environment: KubeVirt is installed and VMI {vm_name} exists")
return True
except Exception as e:
logging.error(f"Error validating environment: {e}")
return False
def patch_vm_spec(self, vm_name: str, namespace: str, running: bool) -> bool:
"""
Patch the VM spec to enable/disable auto-restart.
:param vm_name: Name of the VM to patch
:param namespace: Namespace of the VM
:param running: Whether the VM should be set to running state
:return: True if patch was successful, False otherwise
"""
try:
# Get the VM object first to get its current spec
vm = self.custom_object_client.get_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachines",
name=vm_name
)
# Update the running state
if 'spec' not in vm:
vm['spec'] = {}
vm['spec']['running'] = running
# Apply the patch
self.custom_object_client.patch_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachines",
name=vm_name,
body=vm
)
return True
except ApiException as e:
logging.error(f"Failed to patch VM {vm_name}: {e}")
return False
except Exception as e:
logging.error(f"Unexpected error patching VM {vm_name}: {e}")
return False
def delete_vmi(self, vm_name: str, namespace: str, disable_auto_restart: bool = False, timeout: int = 120) -> int:
"""
Delete a Virtual Machine Instance to simulate a VM outage.
:param vm_name: Name of the VMI to delete
:param namespace: Namespace of the VMI
:return: 0 for success, 1 for failure
"""
try:
logging.info(f"Injecting chaos: Deleting VMI {vm_name} in namespace {namespace}")
# If auto-restart should be disabled, patch the VM spec first
if disable_auto_restart:
logging.info(f"Disabling auto-restart for VM {vm_name} by setting spec.running=False")
if not self.patch_vm_spec(vm_name, namespace, running=False):
logging.error("Failed to disable auto-restart for VM"
" - proceeding with deletion but VM may auto-restart")
start_creation_time = self.original_vmi.get('metadata', {}).get('creationTimestamp')
start_time = time.time()
try:
self.custom_object_client.delete_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
name=vm_name
)
except ApiException as e:
if e.status == 404:
logging.warning(f"VMI {vm_name} not found during deletion")
return 1
else:
logging.error(f"API error during VMI deletion: {e}")
return 1
# Wait for the VMI to be deleted
while time.time() - start_time < timeout:
deleted_vmi = self.get_vmi(vm_name, namespace)
if deleted_vmi:
if start_creation_time != deleted_vmi.get('metadata', {}).get('creationTimestamp'):
logging.info(f"VMI {vm_name} successfully recreated")
self.affected_pod.pod_rescheduling_time = time.time() - start_time
return 0
else:
logging.info(f"VMI {vm_name} successfully deleted")
time.sleep(1)
logging.error(f"Timed out waiting for VMI {vm_name} to be deleted")
self.pods_status.unrecovered = self.affected_pod
return 1
except Exception as e:
logging.error(f"Error deleting VMI {vm_name}: {e}")
log_exception(e)
self.pods_status.unrecovered = self.affected_pod
return 1
def wait_for_running(self, vm_name: str, namespace: str, timeout: int = 120) -> int:
start_time = time.time()
while time.time() - start_time < timeout:
# Check current state once since we've already waited for the duration
vmi = self.get_vmi(vm_name, namespace)
if vmi:
if vmi.get('status', {}).get('phase') == "Running":
end_time = time.time()
self.affected_pod.pod_readiness_time = end_time - start_time
logging.info(f"VMI {vm_name} is already running")
return 0
logging.info(f"VMI {vm_name} exists but is not in Running state. Current state: {vmi.get('status', {}).get('phase')}")
else:
logging.info(f"VMI {vm_name} not yet recreated")
time.sleep(1)
return 1
def recover(self, vm_name: str, namespace: str, disable_auto_restart: bool = False) -> int:
"""
Recover a deleted VMI, either by waiting for auto-recovery or manually recreating it.
:param vm_name: Name of the VMI to recover
:param namespace: Namespace of the VMI
:param disable_auto_restart: Whether auto-restart was disabled during injection
:return: 0 for success, 1 for failure
"""
try:
logging.info(f"Attempting to recover VMI {vm_name} in namespace {namespace}")
if self.original_vmi:
logging.info(f"Auto-recovery didn't occur for VMI {vm_name}. Attempting manual recreation")
try:
# Clean up server-generated fields
vmi_dict = self.original_vmi.copy()
if 'metadata' in vmi_dict:
metadata = vmi_dict['metadata']
for field in ['resourceVersion', 'uid', 'creationTimestamp', 'generation']:
if field in metadata:
del metadata[field]
# Create the VMI
self.custom_object_client.create_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
body=vmi_dict
)
logging.info(f"Successfully recreated VMI {vm_name}")
# Wait for VMI to start running
self.wait_for_running(vm_name,namespace)
logging.warning(f"VMI {vm_name} was recreated but didn't reach Running state in time")
return 0 # Still consider it a success as the VMI was recreated
except Exception as e:
logging.error(f"Error recreating VMI {vm_name}: {e}")
log_exception(e)
return 1
else:
logging.error(f"Failed to recover VMI {vm_name}: No original state captured and auto-recovery did not occur")
return 1
except Exception as e:
logging.error(f"Unexpected error recovering VMI {vm_name}: {e}")
log_exception(e)
return 1

View File

@@ -1,6 +1,5 @@
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn.scenario_plugins.native.plugins import PLUGINS
from krkn_lib.k8s.pods_monitor_pool import PodsMonitorPool
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from typing import Any
@@ -17,76 +16,23 @@ class NativeScenarioPlugin(AbstractScenarioPlugin):
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
) -> int:
pool = PodsMonitorPool(lib_telemetry.get_lib_kubernetes())
kill_scenarios = [
kill_scenario
for kill_scenario in PLUGINS.unserialize_scenario(scenario)
if kill_scenario["id"] == "kill-pods"
]
try:
self.start_monitoring(pool, kill_scenarios)
PLUGINS.run(
scenario,
lib_telemetry.get_lib_kubernetes().get_kubeconfig_path(),
krkn_config,
run_uuid,
)
result = pool.join()
scenario_telemetry.affected_pods = result
if result.error:
logging.error(f"NativeScenarioPlugin unrecovered pods: {result.error}")
return 1
except Exception as e:
logging.error("NativeScenarioPlugin exiting due to Exception %s" % e)
pool.cancel()
return 1
else:
return 0
def get_scenario_types(self) -> list[str]:
return [
"pod_disruption_scenarios",
"pod_network_scenarios",
"ingress_node_scenarios"
]
def start_monitoring(self, pool: PodsMonitorPool, scenarios: list[Any]):
for kill_scenario in scenarios:
recovery_time = kill_scenario["config"]["krkn_pod_recovery_time"]
if (
"namespace_pattern" in kill_scenario["config"]
and "label_selector" in kill_scenario["config"]
):
namespace_pattern = kill_scenario["config"]["namespace_pattern"]
label_selector = kill_scenario["config"]["label_selector"]
pool.select_and_monitor_by_namespace_pattern_and_label(
namespace_pattern=namespace_pattern,
label_selector=label_selector,
max_timeout=recovery_time,
)
logging.info(
f"waiting {recovery_time} seconds for pod recovery, "
f"pod label selector: {label_selector} namespace pattern: {namespace_pattern}"
)
elif (
"namespace_pattern" in kill_scenario["config"]
and "name_pattern" in kill_scenario["config"]
):
namespace_pattern = kill_scenario["config"]["namespace_pattern"]
name_pattern = kill_scenario["config"]["name_pattern"]
pool.select_and_monitor_by_name_pattern_and_namespace_pattern(
pod_name_pattern=name_pattern,
namespace_pattern=namespace_pattern,
max_timeout=recovery_time,
)
logging.info(
f"waiting {recovery_time} seconds for pod recovery, "
f"pod name pattern: {name_pattern} namespace pattern: {namespace_pattern}"
)
else:
raise Exception(
f"impossible to determine monitor parameters, check {kill_scenario} configuration"
)

View File

@@ -28,6 +28,14 @@ class NetworkScenarioConfig:
},
)
image: typing.Annotated[str, validation.min(1)]= field(
default="quay.io/krkn-chaos/krkn:tools",
metadata={
"name": "Image",
"description": "Image of krkn tools to run"
}
)
label_selector: typing.Annotated[
typing.Optional[str], validation.required_if_not("node_interface_name")
] = field(
@@ -142,7 +150,7 @@ class NetworkScenarioErrorOutput:
)
def get_default_interface(node: str, pod_template, cli: CoreV1Api) -> str:
def get_default_interface(node: str, pod_template, cli: CoreV1Api, image: str) -> str:
"""
Function that returns a random interface from a node
@@ -161,7 +169,7 @@ def get_default_interface(node: str, pod_template, cli: CoreV1Api) -> str:
Default interface (string) belonging to the node
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node))
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
logging.info("Creating pod to query interface on node %s" % node)
kube_helper.create_pod(cli, pod_body, "default", 300)
@@ -189,7 +197,7 @@ def get_default_interface(node: str, pod_template, cli: CoreV1Api) -> str:
def verify_interface(
input_interface_list: typing.List[str], node: str, pod_template, cli: CoreV1Api
input_interface_list: typing.List[str], node: str, pod_template, cli: CoreV1Api, image: str
) -> typing.List[str]:
"""
Function that verifies whether a list of interfaces is present in the node.
@@ -212,7 +220,7 @@ def verify_interface(
Returns:
The interface list for the node
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node))
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
logging.info("Creating pod to query interface on node %s" % node)
kube_helper.create_pod(cli, pod_body, "default", 300)
try:
@@ -268,6 +276,7 @@ def get_node_interfaces(
instance_count: int,
pod_template,
cli: CoreV1Api,
image: str
) -> typing.Dict[str, typing.List[str]]:
"""
Function that is used to process the input dictionary with the nodes and
@@ -309,7 +318,7 @@ def get_node_interfaces(
nodes = kube_helper.get_node(None, label_selector, instance_count, cli)
node_interface_dict = {}
for node in nodes:
node_interface_dict[node] = get_default_interface(node, pod_template, cli)
node_interface_dict[node] = get_default_interface(node, pod_template, cli, image)
else:
node_name_list = node_interface_dict.keys()
filtered_node_list = []
@@ -321,7 +330,7 @@ def get_node_interfaces(
for node in filtered_node_list:
node_interface_dict[node] = verify_interface(
node_interface_dict[node], node, pod_template, cli
node_interface_dict[node], node, pod_template, cli, image
)
return node_interface_dict
@@ -337,6 +346,7 @@ def apply_ingress_filter(
cli: CoreV1Api,
create_interfaces: bool = True,
param_selector: str = "all",
image:str = "quay.io/krkn-chaos/krkn:tools",
) -> str:
"""
Function that applies the filters to shape incoming traffic to
@@ -382,14 +392,14 @@ def apply_ingress_filter(
network_params = {param_selector: cfg.network_params[param_selector]}
if create_interfaces:
create_virtual_interfaces(cli, interface_list, node, pod_template)
create_virtual_interfaces(cli, interface_list, node, pod_template, image)
exec_cmd = get_ingress_cmd(
interface_list, network_params, duration=cfg.test_duration
)
logging.info("Executing %s on node %s" % (exec_cmd, node))
job_body = yaml.safe_load(
job_template.render(jobname=str(hash(node))[:5], nodename=node, cmd=exec_cmd)
job_template.render(jobname=str(hash(node))[:5], nodename=node, image=image, cmd=exec_cmd)
)
api_response = kube_helper.create_job(batch_cli, job_body)
@@ -400,7 +410,7 @@ def apply_ingress_filter(
def create_virtual_interfaces(
cli: CoreV1Api, interface_list: typing.List[str], node: str, pod_template
cli: CoreV1Api, interface_list: typing.List[str], node: str, pod_template, image: str
) -> None:
"""
Function that creates a privileged pod and uses it to create
@@ -421,7 +431,7 @@ def create_virtual_interfaces(
- The YAML template used to instantiate a pod to create
virtual interfaces on the node
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node))
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
kube_helper.create_pod(cli, pod_body, "default", 300)
logging.info(
"Creating {0} virtual interfaces on node {1} using a pod".format(
@@ -434,7 +444,7 @@ def create_virtual_interfaces(
def delete_virtual_interfaces(
cli: CoreV1Api, node_list: typing.List[str], pod_template
cli: CoreV1Api, node_list: typing.List[str], pod_template, image: str
):
"""
Function that creates a privileged pod and uses it to delete all
@@ -457,7 +467,7 @@ def delete_virtual_interfaces(
"""
for node in node_list:
pod_body = yaml.safe_load(pod_template.render(nodename=node))
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
kube_helper.create_pod(cli, pod_body, "default", 300)
logging.info("Deleting all virtual interfaces on node {0}".format(node))
delete_ifb(cli, "modtools")
@@ -700,7 +710,7 @@ def network_chaos(
pod_interface_template = env.get_template("pod_interface.j2")
pod_module_template = env.get_template("pod_module.j2")
cli, batch_cli = kube_helper.setup_kubernetes(cfg.kubeconfig_path)
test_image = cfg.image
logging.info("Starting Ingress Network Chaos")
try:
node_interface_dict = get_node_interfaces(
@@ -709,6 +719,7 @@ def network_chaos(
cfg.instance_count,
pod_interface_template,
cli,
test_image
)
except Exception:
return "error", NetworkScenarioErrorOutput(format_exc())
@@ -726,6 +737,7 @@ def network_chaos(
job_template,
batch_cli,
cli,
test_image
)
)
logging.info("Waiting for parallel job to finish")
@@ -746,6 +758,7 @@ def network_chaos(
cli,
create_interfaces=create_interfaces,
param_selector=param,
image=test_image
)
)
logging.info("Waiting for serial job to finish")
@@ -772,6 +785,6 @@ def network_chaos(
logging.error("Ingress Network Chaos exiting due to Exception - %s" % e)
return "error", NetworkScenarioErrorOutput(format_exc())
finally:
delete_virtual_interfaces(cli, node_interface_dict.keys(), pod_module_template)
delete_virtual_interfaces(cli, node_interface_dict.keys(), pod_module_template, test_image)
logging.info("Deleting jobs(if any)")
delete_jobs(cli, batch_cli, job_list[:])

View File

@@ -9,7 +9,7 @@ spec:
hostNetwork: true
containers:
- name: networkchaos
image: docker.io/fedora/tools
image: {{image}}
command: ["/bin/sh", "-c", "{{cmd}}"]
securityContext:
privileged: true
@@ -22,4 +22,4 @@ spec:
hostPath:
path: /lib/modules
restartPolicy: Never
backoffLimit: 0
backoffLimit: 0

View File

@@ -7,7 +7,7 @@ spec:
nodeName: {{nodename}}
containers:
- name: fedtools
image: docker.io/fedora/tools
image: {{image}}
command:
- /bin/sh
- -c

View File

@@ -6,7 +6,7 @@ spec:
nodeName: {{nodename}}
containers:
- name: modtools
image: docker.io/fedora/tools
image: {{image}}
imagePullPolicy: IfNotPresent
command:
- /bin/sh
@@ -27,4 +27,4 @@ spec:
hostNetwork: true
hostIPC: true
hostPID: true
restartPolicy: Never
restartPolicy: Never

View File

@@ -4,7 +4,6 @@ import logging
from os.path import abspath
from typing import List, Any, Dict
from krkn.scenario_plugins.native.run_python_plugin import run_python_file
from arcaflow_plugin_kill_pod import kill_pods, wait_for_pods
from krkn.scenario_plugins.native.network.ingress_shaping import network_chaos
from krkn.scenario_plugins.native.pod_network_outage.pod_network_outage_plugin import (
pod_outage,
@@ -148,13 +147,6 @@ class Plugins:
PLUGINS = Plugins(
[
PluginStep(
kill_pods,
[
"error",
],
),
PluginStep(wait_for_pods, ["error"]),
PluginStep(run_python_file, ["error"]),
PluginStep(network_chaos, ["error"]),
PluginStep(pod_outage, ["error"]),

View File

@@ -9,7 +9,7 @@ spec:
hostNetwork: true
containers:
- name: networkchaos
image: docker.io/fedora/tools
image: {{image}}
command: ["chroot", "/host", "/bin/sh", "-c", "{{cmd}}"]
securityContext:
privileged: true

View File

@@ -6,7 +6,7 @@ spec:
nodeName: {{nodename}}
containers:
- name: modtools
image: docker.io/fedora/tools
image: {{image}}
imagePullPolicy: IfNotPresent
command:
- /bin/sh
@@ -27,4 +27,4 @@ spec:
hostNetwork: true
hostIPC: true
hostPID: true
restartPolicy: Never
restartPolicy: Never

View File

@@ -192,6 +192,7 @@ def apply_outage_policy(
duration: str,
bridge_name: str,
kubecli: KrknKubernetes,
image: str
) -> typing.List[str]:
"""
Function that applies filters(ingress or egress) to block traffic.
@@ -223,6 +224,8 @@ def apply_outage_policy(
batch_cli (BatchV1Api)
- Object to interact with Kubernetes Python client's BatchV1Api API
image (string)
- Image of network chaos tool
Returns:
The name of the job created that executes the commands on a node
for ingress chaos scenario
@@ -239,7 +242,7 @@ def apply_outage_policy(
br = "br-int"
table = 8
for node, ips in node_dict.items():
while len(check_cookie(node, pod_template, br, cookie, kubecli)) > 2 or cookie in cookie_list:
while len(check_cookie(node, pod_template, br, cookie, kubecli, image)) > 2 or cookie in cookie_list:
cookie = random.randint(100, 10000)
exec_cmd = ""
for ip in ips:
@@ -257,6 +260,7 @@ def apply_outage_policy(
job_template.render(
jobname=str(hash(node))[:5] + str(random.randint(0, 10000)),
nodename=node,
image=image,
cmd=exec_cmd,
)
)
@@ -281,6 +285,7 @@ def apply_ingress_policy(
bridge_name: str,
kubecli: KrknKubernetes,
test_execution: str,
image: str,
) -> typing.List[str]:
"""
Function that applies ingress traffic shaping to pod interface.
@@ -327,22 +332,23 @@ def apply_ingress_policy(
job_list = []
yml_list = []
create_virtual_interfaces(kubecli, len(ips), node, pod_template)
create_virtual_interfaces(kubecli, len(ips), node, pod_template, image)
for count, pod_ip in enumerate(set(ips)):
pod_inf = get_pod_interface(node, pod_ip, pod_template, bridge_name, kubecli)
pod_inf = get_pod_interface(node, pod_ip, pod_template, bridge_name, kubecli, image)
exec_cmd = get_ingress_cmd(
test_execution, pod_inf, mod, count, network_params, duration
)
logging.info("Executing %s on pod %s in node %s" % (exec_cmd, pod_ip, node))
job_body = yaml.safe_load(
job_template.render(jobname=mod + str(pod_ip), nodename=node, cmd=exec_cmd)
job_template.render(jobname=mod + str(pod_ip), nodename=node, image=image, cmd=exec_cmd)
)
yml_list.append(job_body)
if pod_ip == node:
break
for job_body in yml_list:
print('jbo body' + str(job_body))
api_response = kubecli.create_job(job_body)
if api_response is None:
raise Exception("Error creating job")
@@ -362,6 +368,7 @@ def apply_net_policy(
bridge_name: str,
kubecli: KrknKubernetes,
test_execution: str,
image: str,
) -> typing.List[str]:
"""
Function that applies egress traffic shaping to pod interface.
@@ -415,7 +422,7 @@ def apply_net_policy(
)
logging.info("Executing %s on pod %s in node %s" % (exec_cmd, pod_ip, node))
job_body = yaml.safe_load(
job_template.render(jobname=mod + str(pod_ip), nodename=node, cmd=exec_cmd)
job_template.render(jobname=mod + str(pod_ip), nodename=node, image=image, cmd=exec_cmd)
)
yml_list.append(job_body)
@@ -530,7 +537,7 @@ def get_egress_cmd(
def create_virtual_interfaces(
kubecli: KrknKubernetes, nummber: int, node: str, pod_template
kubecli: KrknKubernetes, nummber: int, node: str, pod_template, image: str,
) -> None:
"""
Function that creates a privileged pod and uses it to create
@@ -550,8 +557,11 @@ def create_virtual_interfaces(
pod_template (jinja2.environment.Template))
- The YAML template used to instantiate a pod to create
virtual interfaces on the node
image (string)
- Image of network chaos tool
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node))
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
kubecli.create_pod(pod_body, "default", 300)
logging.info(
"Creating {0} virtual interfaces on node {1} using a pod".format(nummber, node)
@@ -562,7 +572,7 @@ def create_virtual_interfaces(
def delete_virtual_interfaces(
kubecli: KrknKubernetes, node_list: typing.List[str], pod_template
kubecli: KrknKubernetes, node_list: typing.List[str], pod_template, image: str,
):
"""
Function that creates a privileged pod and uses it to delete all
@@ -582,10 +592,13 @@ def delete_virtual_interfaces(
pod_template (jinja2.environment.Template))
- The YAML template used to instantiate a pod to delete
virtual interfaces on the node
image (string)
- Image of network chaos tool
"""
for node in node_list:
pod_body = yaml.safe_load(pod_template.render(nodename=node))
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
kubecli.create_pod(pod_body, "default", 300)
logging.info("Deleting all virtual interfaces on node {0}".format(node))
delete_ifb(kubecli, "modtools")
@@ -619,7 +632,7 @@ def delete_ifb(kubecli: KrknKubernetes, pod_name: str):
kubecli.exec_cmd_in_pod(exec_command, pod_name, "default", base_command="chroot")
def list_bridges(node: str, pod_template, kubecli: KrknKubernetes) -> typing.List[str]:
def list_bridges(node: str, pod_template, kubecli: KrknKubernetes, image: str) -> typing.List[str]:
"""
Function that returns a list of bridges on the node
@@ -634,11 +647,13 @@ def list_bridges(node: str, pod_template, kubecli: KrknKubernetes) -> typing.Lis
kubecli (KrknKubernetes)
- Object to interact with Kubernetes Python client
image (string)
- Image of network chaos tool
Returns:
List of bridges on the node.
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node))
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
logging.info("Creating pod to query bridge on node %s" % node)
kubecli.create_pod(pod_body, "default", 300)
@@ -662,7 +677,7 @@ def list_bridges(node: str, pod_template, kubecli: KrknKubernetes) -> typing.Lis
def check_cookie(
node: str, pod_template, br_name, cookie, kubecli: KrknKubernetes
node: str, pod_template, br_name, cookie, kubecli: KrknKubernetes, image: str
) -> str:
"""
Function to check for matching flow rules
@@ -684,11 +699,13 @@ def check_cookie(
cli (CoreV1Api)
- Object to interact with Kubernetes Python client's CoreV1 API
image (string)
- Image of network chaos tool
Returns
Returns the matching flow rules
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node))
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
logging.info("Creating pod to query duplicate rules on node %s" % node)
kubecli.create_pod(pod_body, "default", 300)
@@ -721,7 +738,7 @@ def check_cookie(
def get_pod_interface(
node: str, ip: str, pod_template, br_name, kubecli: KrknKubernetes
node: str, ip: str, pod_template, br_name, kubecli: KrknKubernetes, image: str = "quay.io/krkn-chaos/krkn:tools"
) -> str:
"""
Function to query the pod interface on a node
@@ -747,7 +764,7 @@ def get_pod_interface(
Returns the pod interface name
"""
pod_body = yaml.safe_load(pod_template.render(nodename=node))
pod_body = yaml.safe_load(pod_template.render(nodename=node, image=image))
logging.info("Creating pod to query pod interface on node %s" % node)
kubecli.create_pod(pod_body, "default", 300)
inf = ""
@@ -788,7 +805,8 @@ def get_pod_interface(
def check_bridge_interface(
node_name: str, pod_template, bridge_name: str, kubecli: KrknKubernetes
node_name: str, pod_template, bridge_name: str, kubecli: KrknKubernetes,
image: str = "quay.io/krkn-chaos/krkn:tools"
) -> bool:
"""
Function is used to check if the required OVS or OVN bridge is found in
@@ -814,7 +832,7 @@ def check_bridge_interface(
nodes = kubecli.get_node(node_name, None, 1)
node_bridge = []
for node in nodes:
node_bridge = list_bridges(node, pod_template, kubecli)
node_bridge = list_bridges(node, pod_template, kubecli, image=image)
if bridge_name not in node_bridge:
raise Exception(f"OVS bridge {bridge_name} not found on the node ")
@@ -835,6 +853,14 @@ class InputParams:
}
)
image: typing.Annotated[str, validation.min(1)]= field(
default="quay.io/krkn-chaos/krkn:tools",
metadata={
"name": "Image",
"description": "Image of krkn tools to run"
}
)
direction: typing.List[str] = field(
default_factory=lambda: ["ingress", "egress"],
metadata={
@@ -1004,6 +1030,7 @@ def pod_outage(
test_namespace = params.namespace
test_label_selector = params.label_selector
test_pod_name = params.pod_name
test_image = params.image
filter_dict = {}
job_list = []
publish = False
@@ -1040,7 +1067,7 @@ def pod_outage(
label_set.add("%s=%s" % (key, value))
check_bridge_interface(
list(node_dict.keys())[0], pod_module_template, br_name, kubecli
list(node_dict.keys())[0], pod_module_template, br_name, kubecli, test_image
)
for direction, ports in filter_dict.items():
@@ -1055,6 +1082,7 @@ def pod_outage(
params.test_duration,
br_name,
kubecli,
test_image
)
)
@@ -1095,7 +1123,16 @@ class EgressParams:
}
)
image: typing.Annotated[str, validation.min(1)]= field(
default="quay.io/krkn-chaos/krkn:tools",
metadata={
"name": "Image",
"description": "Image of krkn tools to run"
}
)
network_params: typing.Dict[str, str] = field(
default=None,
metadata={
"name": "Network Parameters",
"description": "The network filters that are applied on the interface. "
@@ -1254,6 +1291,7 @@ def pod_egress_shaping(
test_namespace = params.namespace
test_label_selector = params.label_selector
test_pod_name = params.pod_name
test_image = params.image
job_list = []
publish = False
@@ -1287,7 +1325,7 @@ def pod_egress_shaping(
label_set.add("%s=%s" % (key, value))
check_bridge_interface(
list(node_dict.keys())[0], pod_module_template, br_name, kubecli
list(node_dict.keys())[0], pod_module_template, br_name, kubecli, test_image
)
for mod in mod_lst:
@@ -1304,6 +1342,7 @@ def pod_egress_shaping(
br_name,
kubecli,
params.execution_type,
test_image
)
)
if params.execution_type == "serial":
@@ -1357,8 +1396,17 @@ class IngressParams:
"for details.",
}
)
image: typing.Annotated[str, validation.min(1)] = field(
default="quay.io/krkn-chaos/krkn:tools",
metadata={
"name": "Image",
"description": "Image to use for injecting network chaos",
}
)
network_params: typing.Dict[str, str] = field(
default=None,
metadata={
"name": "Network Parameters",
"description": "The network filters that are applied on the interface. "
@@ -1518,6 +1566,7 @@ def pod_ingress_shaping(
test_namespace = params.namespace
test_label_selector = params.label_selector
test_pod_name = params.pod_name
test_image = params.image
job_list = []
publish = False
@@ -1551,7 +1600,7 @@ def pod_ingress_shaping(
label_set.add("%s=%s" % (key, value))
check_bridge_interface(
list(node_dict.keys())[0], pod_module_template, br_name, kubecli
list(node_dict.keys())[0], pod_module_template, br_name, kubecli, test_image
)
for mod in mod_lst:
@@ -1568,6 +1617,7 @@ def pod_ingress_shaping(
br_name,
kubecli,
params.execution_type,
image=test_image
)
)
if params.execution_type == "serial":
@@ -1604,6 +1654,6 @@ def pod_ingress_shaping(
logging.error("Pod network Shaping scenario exiting due to Exception - %s" % e)
return "error", PodIngressNetShapingErrorOutput(format_exc())
finally:
delete_virtual_interfaces(kubecli, node_dict.keys(), pod_module_template)
delete_virtual_interfaces(kubecli, node_dict.keys(), pod_module_template, test_image)
logging.info("Deleting jobs(if any)")
delete_jobs(kubecli, job_list[:])

View File

@@ -9,7 +9,7 @@ spec:
hostNetwork: true
containers:
- name: networkchaos
image: docker.io/fedora/tools
image: {{image}}
command: ["/bin/sh", "-c", "{{cmd}}"]
securityContext:
privileged: true

View File

@@ -42,7 +42,9 @@ class NetworkChaosScenarioPlugin(AbstractScenarioPlugin):
test_egress = get_yaml_item_value(
test_dict, "egress", {"bandwidth": "100mbit"}
)
test_image = get_yaml_item_value(
test_dict, "image", "quay.io/krkn-chaos/krkn:tools"
)
if test_node:
node_name_list = test_node.split(",")
nodelst = common_node_functions.get_node_by_name(node_name_list, lib_telemetry.get_lib_kubernetes())
@@ -60,6 +62,7 @@ class NetworkChaosScenarioPlugin(AbstractScenarioPlugin):
nodelst,
pod_template,
lib_telemetry.get_lib_kubernetes(),
image=test_image
)
joblst = []
egress_lst = [i for i in param_lst if i in test_egress]
@@ -71,6 +74,7 @@ class NetworkChaosScenarioPlugin(AbstractScenarioPlugin):
"execution": test_execution,
"instance_count": test_instance_count,
"egress": test_egress,
"image": test_image
}
}
logging.info(
@@ -94,6 +98,7 @@ class NetworkChaosScenarioPlugin(AbstractScenarioPlugin):
jobname=i + str(hash(node))[:5],
nodename=node,
cmd=exec_cmd,
image=test_image
)
)
joblst.append(job_body["metadata"]["name"])
@@ -153,10 +158,10 @@ class NetworkChaosScenarioPlugin(AbstractScenarioPlugin):
return 0
def verify_interface(
self, test_interface, nodelst, template, kubecli: KrknKubernetes
self, test_interface, nodelst, template, kubecli: KrknKubernetes, image: str
):
pod_index = random.randint(0, len(nodelst) - 1)
pod_body = yaml.safe_load(template.render(nodename=nodelst[pod_index]))
pod_body = yaml.safe_load(template.render(nodename=nodelst[pod_index], image=image))
logging.info("Creating pod to query interface on node %s" % nodelst[pod_index])
kubecli.create_pod(pod_body, "default", 300)
try:
@@ -177,7 +182,7 @@ class NetworkChaosScenarioPlugin(AbstractScenarioPlugin):
raise RuntimeError()
return test_interface
finally:
logging.info("Deleteing pod to query interface on node")
logging.info("Deleting pod to query interface on node")
kubecli.delete_pod("fedtools", "default")
# krkn_lib

View File

@@ -7,7 +7,7 @@ spec:
nodeName: {{nodename}}
containers:
- name: fedtools
image: docker.io/fedora/tools
image: {{image}}
command:
- /bin/sh
- -c

View File

@@ -6,6 +6,7 @@ class NetworkChaosScenarioType(Enum):
Node = 1
Pod = 2
@dataclass
class BaseNetworkChaosConfig:
supported_execution = ["serial", "parallel"]
@@ -13,20 +14,31 @@ class BaseNetworkChaosConfig:
wait_duration: int
test_duration: int
label_selector: str
service_account: str
instance_count: int
execution: str
namespace: str
taints: list[str]
def validate(self) -> list[str]:
errors = []
if self.execution is None:
errors.append(f"execution cannot be None, supported values are: {','.join(self.supported_execution)}")
errors.append(
f"execution cannot be None, supported values are: {','.join(self.supported_execution)}"
)
if self.execution not in self.supported_execution:
errors.append(f"{self.execution} is not in supported execution mod: {','.join(self.supported_execution)}")
if self.label_selector is None:
errors.append(
f"{self.execution} is not in supported execution mod: {','.join(self.supported_execution)}"
)
if self.id == "node_network_filter" and self.label_selector is None:
errors.append("label_selector cannot be None")
if not isinstance(self.wait_duration, int):
errors.append("wait_duration must be an int")
if not isinstance(self.test_duration, int):
errors.append("test_duration must be an int")
return errors
@dataclass
class NetworkFilterConfig(BaseNetworkChaosConfig):
ingress: bool
@@ -34,8 +46,15 @@ class NetworkFilterConfig(BaseNetworkChaosConfig):
interfaces: list[str]
target: str
ports: list[int]
image: str
protocols: list[str]
def validate(self) -> list[str]:
errors = super().validate()
# here further validations
allowed_protocols = {"tcp", "udp"}
if not set(self.protocols).issubset(allowed_protocols):
errors.append(
f"{self.protocols} contains not allowed protocols only tcp and udp is allowed"
)
return errors

View File

@@ -3,19 +3,25 @@ import logging
import queue
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.network_chaos_ng.models import BaseNetworkChaosConfig, NetworkChaosScenarioType
from krkn.scenario_plugins.network_chaos_ng.models import (
BaseNetworkChaosConfig,
NetworkChaosScenarioType,
)
class AbstractNetworkChaosModule(abc.ABC):
"""
The abstract class that needs to be implemented by each Network Chaos Scenario
"""
kubecli: KrknTelemetryOpenshift
base_network_config: BaseNetworkChaosConfig
@abc.abstractmethod
def run(self, target: str, kubecli: KrknTelemetryOpenshift, error_queue: queue.Queue = None):
def run(self, target: str, error_queue: queue.Queue = None):
"""
the entrypoint method for the Network Chaos Scenario
:param target: The resource name that will be targeted by the scenario (Node Name, Pod Name etc.)
:param kubecli: The `KrknTelemetryOpenshift` needed by the scenario to access to the krkn-lib methods
:param error_queue: A queue that will be used by the plugin to push the errors raised during the execution of parallel modules
"""
pass
@@ -28,31 +34,17 @@ class AbstractNetworkChaosModule(abc.ABC):
"""
pass
def get_targets(self) -> list[str]:
"""
checks and returns the targets based on the common scenario configuration
"""
def log_info(self, message: str, parallel: bool = False, node_name: str = ""):
"""
log helper method for INFO severity to be used in the scenarios
"""
if parallel:
logging.info(f"[{node_name}]: {message}")
else:
logging.info(message)
pass
def log_warning(self, message: str, parallel: bool = False, node_name: str = ""):
"""
log helper method for WARNING severity to be used in the scenarios
"""
if parallel:
logging.warning(f"[{node_name}]: {message}")
else:
logging.warning(message)
def log_error(self, message: str, parallel: bool = False, node_name: str = ""):
"""
log helper method for ERROR severity to be used in the scenarios
"""
if parallel:
logging.error(f"[{node_name}]: {message}")
else:
logging.error(message)
def __init__(
self,
base_network_config: BaseNetworkChaosConfig,
kubecli: KrknTelemetryOpenshift,
):
self.kubecli = kubecli
self.base_network_config = base_network_config

View File

@@ -1,11 +1,6 @@
import os
import queue
import time
import yaml
from jinja2 import Environment, FileSystemLoader
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_random_string
from krkn.scenario_plugins.network_chaos_ng.models import (
@@ -16,87 +11,92 @@ from krkn.scenario_plugins.network_chaos_ng.models import (
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_filter import (
deploy_network_filter_pod,
apply_network_rules,
clean_network_rules,
generate_rules,
get_default_interface,
)
class NodeNetworkFilterModule(AbstractNetworkChaosModule):
config: NetworkFilterConfig
kubecli: KrknTelemetryOpenshift
def run(
self,
target: str,
kubecli: KrknTelemetryOpenshift,
error_queue: queue.Queue = None,
):
def run(self, target: str, error_queue: queue.Queue = None):
parallel = False
if error_queue:
parallel = True
try:
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
env = Environment(loader=file_loader, autoescape=True)
pod_name = f"node-filter-{get_random_string(5)}"
pod_template = env.get_template("templates/network-chaos.j2")
pod_body = yaml.safe_load(
pod_template.render(
pod_name=pod_name,
namespace=self.config.namespace,
host_network=True,
target=target,
)
)
self.log_info(
f"creating pod to filter "
log_info(
f"creating workload to filter node {target} network"
f"ports {','.join([str(port) for port in self.config.ports])}, "
f"ingress:{str(self.config.ingress)}, "
f"egress:{str(self.config.egress)}",
parallel,
target,
)
kubecli.get_lib_kubernetes().create_pod(
pod_body, self.config.namespace, 300
pod_name = f"node-filter-{get_random_string(5)}"
deploy_network_filter_pod(
self.config,
target,
pod_name,
self.kubecli.get_lib_kubernetes(),
)
if len(self.config.interfaces) == 0:
interfaces = [
self.get_default_interface(pod_name, self.config.namespace, kubecli)
get_default_interface(
pod_name,
self.config.namespace,
self.kubecli.get_lib_kubernetes(),
)
]
self.log_info(f"detected default interface {interfaces[0]}")
log_info(
f"detected default interface {interfaces[0]}", parallel, target
)
else:
interfaces = self.config.interfaces
input_rules, output_rules = self.generate_rules(interfaces)
input_rules, output_rules = generate_rules(interfaces, self.config)
for rule in input_rules:
self.log_info(f"applying iptables INPUT rule: {rule}", parallel, target)
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
[rule], pod_name, self.config.namespace
)
for rule in output_rules:
self.log_info(
f"applying iptables OUTPUT rule: {rule}", parallel, target
)
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
[rule], pod_name, self.config.namespace
)
self.log_info(
f"waiting {self.config.test_duration} seconds before removing the iptables rules"
apply_network_rules(
self.kubecli.get_lib_kubernetes(),
input_rules,
output_rules,
pod_name,
self.config.namespace,
parallel,
target,
)
log_info(
f"waiting {self.config.test_duration} seconds before removing the iptables rules",
parallel,
target,
)
time.sleep(self.config.test_duration)
self.log_info("removing iptables rules")
for _ in input_rules:
# always deleting the first rule since has been inserted from the top
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
[f"iptables -D INPUT 1"], pod_name, self.config.namespace
)
for _ in output_rules:
# always deleting the first rule since has been inserted from the top
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
[f"iptables -D OUTPUT 1"], pod_name, self.config.namespace
)
self.log_info(
f"deleting network chaos pod {pod_name} from {self.config.namespace}"
log_info("removing iptables rules", parallel, target)
clean_network_rules(
self.kubecli.get_lib_kubernetes(),
input_rules,
output_rules,
pod_name,
self.config.namespace,
)
kubecli.get_lib_kubernetes().delete_pod(pod_name, self.config.namespace)
self.kubecli.get_lib_kubernetes().delete_pod(
pod_name, self.config.namespace
)
except Exception as e:
if error_queue is None:
@@ -104,33 +104,25 @@ class NodeNetworkFilterModule(AbstractNetworkChaosModule):
else:
error_queue.put(str(e))
def __init__(self, config: NetworkFilterConfig):
def __init__(self, config: NetworkFilterConfig, kubecli: KrknTelemetryOpenshift):
super().__init__(config, kubecli)
self.config = config
def get_config(self) -> (NetworkChaosScenarioType, BaseNetworkChaosConfig):
return NetworkChaosScenarioType.Node, self.config
def get_default_interface(
self, pod_name: str, namespace: str, kubecli: KrknTelemetryOpenshift
) -> str:
cmd = "ip r | grep default | awk '/default/ {print $5}'"
output = kubecli.get_lib_kubernetes().exec_cmd_in_pod(
[cmd], pod_name, namespace
)
return output.replace("\n", "")
def get_targets(self) -> list[str]:
if self.base_network_config.label_selector:
return self.kubecli.get_lib_kubernetes().list_nodes(
self.base_network_config.label_selector
)
else:
if not self.config.target:
raise Exception(
"neither node selector nor node_name (target) specified, aborting."
)
node_info = self.kubecli.get_lib_kubernetes().list_nodes()
if self.config.target not in node_info:
raise Exception(f"node {self.config.target} not found, aborting")
def generate_rules(self, interfaces: list[str]) -> (list[str], list[str]):
input_rules = []
output_rules = []
for interface in interfaces:
for port in self.config.ports:
if self.config.egress:
output_rules.append(
f"iptables -I OUTPUT 1 -p tcp --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
)
if self.config.ingress:
input_rules.append(
f"iptables -I INPUT 1 -i {interface} -p tcp --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
)
return input_rules, output_rules
return [self.config.target]

View File

@@ -0,0 +1,177 @@
import queue
import time
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_random_string
from krkn.scenario_plugins.network_chaos_ng.models import (
NetworkChaosScenarioType,
BaseNetworkChaosConfig,
NetworkFilterConfig,
)
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_filter import (
deploy_network_filter_pod,
get_default_interface,
generate_namespaced_rules,
apply_network_rules,
clean_network_rules_namespaced,
)
class PodNetworkFilterModule(AbstractNetworkChaosModule):
config: NetworkFilterConfig
def run(self, target: str, error_queue: queue.Queue = None):
parallel = False
if error_queue:
parallel = True
try:
pod_name = f"pod-filter-{get_random_string(5)}"
container_name = f"fedora-container-{get_random_string(5)}"
pod_info = self.kubecli.get_lib_kubernetes().get_pod_info(
target, self.config.namespace
)
log_info(
f"creating workload to filter pod {self.config.target} network"
f"ports {','.join([str(port) for port in self.config.ports])}, "
f"ingress:{str(self.config.ingress)}, "
f"egress:{str(self.config.egress)}",
parallel,
pod_name,
)
if not pod_info:
raise Exception(
f"impossible to retrieve infos for pod {self.config.target} namespace {self.config.namespace}"
)
deploy_network_filter_pod(
self.config,
pod_info.nodeName,
pod_name,
self.kubecli.get_lib_kubernetes(),
container_name,
)
if len(self.config.interfaces) == 0:
interfaces = [
get_default_interface(
pod_name,
self.config.namespace,
self.kubecli.get_lib_kubernetes(),
)
]
log_info(
f"detected default interface {interfaces[0]}",
parallel,
pod_name,
)
else:
interfaces = self.config.interfaces
container_ids = self.kubecli.get_lib_kubernetes().get_container_ids(
target, self.config.namespace
)
if len(container_ids) == 0:
raise Exception(
f"impossible to resolve container id for pod {target} namespace {self.config.namespace}"
)
log_info(f"targeting container {container_ids[0]}", parallel, pod_name)
pids = self.kubecli.get_lib_kubernetes().get_pod_pids(
base_pod_name=pod_name,
base_pod_namespace=self.config.namespace,
base_pod_container_name=container_name,
pod_name=target,
pod_namespace=self.config.namespace,
pod_container_id=container_ids[0],
)
if not pids:
raise Exception(f"impossible to resolve pid for pod {target}")
log_info(
f"resolved pids {pids} in node {pod_info.nodeName} for pod {target}",
parallel,
pod_name,
)
input_rules, output_rules = generate_namespaced_rules(
interfaces, self.config, pids
)
apply_network_rules(
self.kubecli.get_lib_kubernetes(),
input_rules,
output_rules,
pod_name,
self.config.namespace,
parallel,
target,
)
log_info(
f"waiting {self.config.test_duration} seconds before removing the iptables rules",
parallel,
pod_name,
)
time.sleep(self.config.test_duration)
log_info("removing iptables rules", parallel, pod_name)
clean_network_rules_namespaced(
self.kubecli.get_lib_kubernetes(),
input_rules,
output_rules,
pod_name,
self.config.namespace,
pids,
)
self.kubecli.get_lib_kubernetes().delete_pod(
pod_name, self.config.namespace
)
except Exception as e:
if error_queue is None:
raise e
else:
error_queue.put(str(e))
def __init__(self, config: NetworkFilterConfig, kubecli: KrknTelemetryOpenshift):
super().__init__(config, kubecli)
self.config = config
def get_config(self) -> (NetworkChaosScenarioType, BaseNetworkChaosConfig):
return NetworkChaosScenarioType.Pod, self.config
def get_targets(self) -> list[str]:
if not self.config.namespace:
raise Exception("namespace not specified, aborting")
if self.base_network_config.label_selector:
return self.kubecli.get_lib_kubernetes().list_pods(
self.config.namespace, self.config.label_selector
)
else:
if not self.config.target:
raise Exception(
"neither node selector nor node_name (target) specified, aborting."
)
if not self.kubecli.get_lib_kubernetes().check_if_pod_exists(
self.config.target, self.config.namespace
):
raise Exception(
f"pod {self.config.target} not found in namespace {self.config.namespace}"
)
return [self.config.target]

View File

@@ -4,14 +4,29 @@ metadata:
name: {{pod_name}}
namespace: {{namespace}}
spec:
{% if service_account %}
serviceAccountName: {{ service_account }}
{%endif%}
{% if host_network %}
hostNetwork: true
{%endif%}
{% if taints %}
tolerations:
{% for toleration in taints %}
- key: "{{ toleration.key }}"
operator: "{{ toleration.operator }}"
{% if toleration.value %}
value: "{{ toleration.value }}"
{% endif %}
effect: "{{ toleration.effect }}"
{% endfor %}
{% endif %}
hostPID: true
nodeSelector:
kubernetes.io/hostname: {{target}}
containers:
- name: fedora
- name: {{container_name}}
imagePullPolicy: Always
image: quay.io/krkn-chaos/krkn-network-chaos:latest
image: {{workload_image}}
securityContext:
privileged: true

View File

@@ -0,0 +1,31 @@
import logging
def log_info(message: str, parallel: bool = False, node_name: str = ""):
"""
log helper method for INFO severity to be used in the scenarios
"""
if parallel:
logging.info(f"[{node_name}]: {message}")
else:
logging.info(message)
def log_error(self, message: str, parallel: bool = False, node_name: str = ""):
"""
log helper method for ERROR severity to be used in the scenarios
"""
if parallel:
logging.error(f"[{node_name}]: {message}")
else:
logging.error(message)
def log_warning(self, message: str, parallel: bool = False, node_name: str = ""):
"""
log helper method for WARNING severity to be used in the scenarios
"""
if parallel:
logging.warning(f"[{node_name}]: {message}")
else:
logging.warning(message)

View File

@@ -0,0 +1,161 @@
import os
import yaml
from jinja2 import FileSystemLoader, Environment
from krkn_lib.k8s import KrknKubernetes
from krkn.scenario_plugins.network_chaos_ng.models import NetworkFilterConfig
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
def generate_rules(
interfaces: list[str], config: NetworkFilterConfig
) -> (list[str], list[str]):
input_rules = []
output_rules = []
for interface in interfaces:
for port in config.ports:
if config.egress:
for protocol in set(config.protocols):
output_rules.append(
f"iptables -I OUTPUT 1 -p {protocol} --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
)
if config.ingress:
for protocol in set(config.protocols):
input_rules.append(
f"iptables -I INPUT 1 -i {interface} -p {protocol} --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
)
return input_rules, output_rules
def generate_namespaced_rules(
interfaces: list[str], config: NetworkFilterConfig, pids: list[str]
) -> (list[str], list[str]):
namespaced_input_rules: list[str] = []
namespaced_output_rules: list[str] = []
input_rules, output_rules = generate_rules(interfaces, config)
for pid in pids:
ns_input_rules = [
f"nsenter --target {pid} --net -- {rule}" for rule in input_rules
]
ns_output_rules = [
f"nsenter --target {pid} --net -- {rule}" for rule in output_rules
]
namespaced_input_rules.extend(ns_input_rules)
namespaced_output_rules.extend(ns_output_rules)
return namespaced_input_rules, namespaced_output_rules
def deploy_network_filter_pod(
config: NetworkFilterConfig,
target_node: str,
pod_name: str,
kubecli: KrknKubernetes,
container_name: str = "fedora",
):
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
env = Environment(loader=file_loader, autoescape=True)
pod_template = env.get_template("templates/network-chaos.j2")
tolerations = []
for taint in config.taints:
key_value_part, effect = taint.split(":", 1)
if "=" in key_value_part:
key, value = key_value_part.split("=", 1)
operator = "Equal"
else:
key = key_value_part
value = None
operator = "Exists"
toleration = {
"key": key,
"operator": operator,
"effect": effect,
}
if value is not None:
toleration["value"] = value
tolerations.append(toleration)
pod_body = yaml.safe_load(
pod_template.render(
pod_name=pod_name,
namespace=config.namespace,
host_network=True,
target=target_node,
container_name=container_name,
workload_image=config.image,
taints=tolerations,
service_account=config.service_account
)
)
kubecli.create_pod(pod_body, config.namespace, 300)
def apply_network_rules(
kubecli: KrknKubernetes,
input_rules: list[str],
output_rules: list[str],
pod_name: str,
namespace: str,
parallel: bool,
node_name: str,
):
for rule in input_rules:
log_info(f"applying iptables INPUT rule: {rule}", parallel, node_name)
kubecli.exec_cmd_in_pod([rule], pod_name, namespace)
for rule in output_rules:
log_info(f"applying iptables OUTPUT rule: {rule}", parallel, node_name)
kubecli.exec_cmd_in_pod([rule], pod_name, namespace)
def clean_network_rules(
kubecli: KrknKubernetes,
input_rules: list[str],
output_rules: list[str],
pod_name: str,
namespace: str,
):
for _ in input_rules:
# always deleting the first rule since has been inserted from the top
kubecli.exec_cmd_in_pod([f"iptables -D INPUT 1"], pod_name, namespace)
for _ in output_rules:
# always deleting the first rule since has been inserted from the top
kubecli.exec_cmd_in_pod([f"iptables -D OUTPUT 1"], pod_name, namespace)
def clean_network_rules_namespaced(
kubecli: KrknKubernetes,
input_rules: list[str],
output_rules: list[str],
pod_name: str,
namespace: str,
pids: list[str],
):
for _ in input_rules:
for pid in pids:
# always deleting the first rule since has been inserted from the top
kubecli.exec_cmd_in_pod(
[f"nsenter --target {pid} --net -- iptables -D INPUT 1"],
pod_name,
namespace,
)
for _ in output_rules:
for pid in pids:
# always deleting the first rule since has been inserted from the top
kubecli.exec_cmd_in_pod(
[f"nsenter --target {pid} --net -- iptables -D OUTPUT 1"],
pod_name,
namespace,
)
def get_default_interface(
pod_name: str, namespace: str, kubecli: KrknKubernetes
) -> str:
cmd = "ip r | grep default | awk '/default/ {print $5}'"
output = kubecli.exec_cmd_in_pod([cmd], pod_name, namespace)
return output.replace("\n", "")

View File

@@ -1,14 +1,25 @@
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.network_chaos_ng.models import NetworkFilterConfig
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import AbstractNetworkChaosModule
from krkn.scenario_plugins.network_chaos_ng.modules.node_network_filter import NodeNetworkFilterModule
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.node_network_filter import (
NodeNetworkFilterModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.pod_network_filter import (
PodNetworkFilterModule,
)
supported_modules = ["node_network_filter", "pod_network_filter"]
supported_modules = ["node_network_filter"]
class NetworkChaosFactory:
@staticmethod
def get_instance(config: dict[str, str]) -> AbstractNetworkChaosModule:
def get_instance(
config: dict[str, str], kubecli: KrknTelemetryOpenshift
) -> AbstractNetworkChaosModule:
if config["id"] is None:
raise Exception("network chaos id cannot be None")
if config["id"] not in supported_modules:
@@ -19,6 +30,10 @@ class NetworkChaosFactory:
errors = config.validate()
if len(errors) > 0:
raise Exception(f"config validation errors: [{';'.join(errors)}]")
return NodeNetworkFilterModule(config)
return NodeNetworkFilterModule(config, kubecli)
if config["id"] == "pod_network_filter":
config = NetworkFilterConfig(**config)
errors = config.validate()
if len(errors) > 0:
raise Exception(f"config validation errors: [{';'.join(errors)}]")
return PodNetworkFilterModule(config, kubecli)

View File

@@ -9,10 +9,6 @@ from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn.scenario_plugins.network_chaos_ng.models import (
NetworkChaosScenarioType,
BaseNetworkChaosConfig,
)
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
@@ -39,56 +35,52 @@ class NetworkChaosNgScenarioPlugin(AbstractScenarioPlugin):
)
return 1
for config in scenario_config:
network_chaos = NetworkChaosFactory.get_instance(config)
network_chaos_config = network_chaos.get_config()
logging.info(
f"running network_chaos scenario: {network_chaos_config[1].id}"
network_chaos = NetworkChaosFactory.get_instance(
config, lib_telemetry
)
if network_chaos_config[0] == NetworkChaosScenarioType.Node:
targets = lib_telemetry.get_lib_kubernetes().list_nodes(
network_chaos_config[1].label_selector
)
else:
targets = lib_telemetry.get_lib_kubernetes().list_pods(
network_chaos_config[1].namespace,
network_chaos_config[1].label_selector,
)
network_chaos_type, network_chaos_config = (
network_chaos.get_config()
)
logging.info(
f"running network_chaos scenario: {network_chaos_config.id}"
)
targets = network_chaos.get_targets()
if len(targets) == 0:
logging.warning(
f"no targets found for {network_chaos_config[1].id} "
f"network chaos scenario with selector {network_chaos_config[1].label_selector} "
f"with target type {network_chaos_config[0]}"
f"no targets found for {network_chaos_config.id} "
f"network chaos scenario with selector {network_chaos_config.label_selector} "
f"with target type {network_chaos_type}"
)
if network_chaos_config[1].instance_count != 0 and network_chaos_config[1].instance_count > len(targets):
targets = random.sample(targets, network_chaos_config[1].instance_count)
if (
network_chaos_config.instance_count != 0
and network_chaos_config.instance_count > len(targets)
):
targets = random.sample(
targets, network_chaos_config.instance_count
)
if network_chaos_config[1].execution == "parallel":
self.run_parallel(targets, network_chaos, lib_telemetry)
if network_chaos_config.execution == "parallel":
self.run_parallel(targets, network_chaos)
else:
self.run_serial(targets, network_chaos, lib_telemetry)
self.run_serial(targets, network_chaos)
if len(config) > 1:
logging.info(f"waiting {network_chaos_config[1].wait_duration} seconds before running the next "
f"Network Chaos NG Module")
time.sleep(network_chaos_config[1].wait_duration)
logging.info(
f"waiting {network_chaos_config.wait_duration} seconds before running the next "
f"Network Chaos NG Module"
)
time.sleep(network_chaos_config.wait_duration)
except Exception as e:
logging.error(str(e))
return 1
return 0
def run_parallel(
self,
targets: list[str],
module: AbstractNetworkChaosModule,
lib_telemetry: KrknTelemetryOpenshift,
):
def run_parallel(self, targets: list[str], module: AbstractNetworkChaosModule):
error_queue = queue.Queue()
threads = []
errors = []
for target in targets:
thread = threading.Thread(
target=module.run, args=[target, lib_telemetry, error_queue]
)
thread = threading.Thread(target=module.run, args=[target, error_queue])
thread.start()
threads.append(thread)
for thread in threads:
@@ -103,14 +95,9 @@ class NetworkChaosNgScenarioPlugin(AbstractScenarioPlugin):
f"module {module.get_config()[1].id} execution failed: [{';'.join(errors)}]"
)
def run_serial(
self,
targets: list[str],
module: AbstractNetworkChaosModule,
lib_telemetry: KrknTelemetryOpenshift,
):
def run_serial(self, targets: list[str], module: AbstractNetworkChaosModule):
for target in targets:
module.run(target, lib_telemetry)
module.run(target)
def get_scenario_types(self) -> list[str]:
return ["network_chaos_ng_scenarios"]

View File

@@ -84,7 +84,7 @@ class abstract_node_scenarios:
)
logging.error("stop_kubelet_scenario injection failed!")
raise e
self.add_affected_node(affected_node)
self.affected_nodes_status.affected_nodes.append(affected_node)
# Node scenario to stop and start the kubelet
def stop_start_kubelet_scenario(self, instance_kill_count, node, timeout):
@@ -106,7 +106,6 @@ class abstract_node_scenarios:
+ node
+ " -- chroot /host systemctl restart kubelet &"
)
nodeaction.wait_for_not_ready_status(node, timeout, self.kubecli, affected_node)
nodeaction.wait_for_ready_status(node, timeout, self.kubecli,affected_node)
logging.info("The kubelet of the node %s has been restarted" % (node))
logging.info("restart_kubelet_scenario has been successfuly injected!")
@@ -117,7 +116,7 @@ class abstract_node_scenarios:
)
logging.error("restart_kubelet_scenario injection failed!")
raise e
self.add_affected_node(affected_node)
self.affected_nodes_status.affected_nodes.append(affected_node)
# Node scenario to crash the node
def node_crash_scenario(self, instance_kill_count, node, timeout):
@@ -125,7 +124,7 @@ class abstract_node_scenarios:
try:
logging.info("Starting node_crash_scenario injection")
logging.info("Crashing the node %s" % (node))
runcommand.invoke(
runcommand.run(
"oc debug node/" + node + " -- chroot /host "
"dd if=/dev/urandom of=/proc/sysrq-trigger"
)
@@ -136,7 +135,7 @@ class abstract_node_scenarios:
"Test Failed" % (e)
)
logging.error("node_crash_scenario injection failed!")
raise e
return 1
# Node scenario to check service status on helper node
def node_service_status(self, node, service, ssh_private_key, timeout):

View File

@@ -10,6 +10,7 @@ import time
import traceback
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus
from krkn_lib.utils import get_random_string
class BM:
def __init__(self, bm_info, user, passwd):
@@ -21,6 +22,17 @@ class BM:
with oc.project("openshift-machine-api"):
return oc.selector("node/" + node_name).object()
def get_bm_disks(self, node_name):
if (
self.bm_info is not None
and node_name in self.bm_info
and "disks" in self.bm_info[node_name]
):
return self.bm_info[node_name]["disks"]
else:
return []
# Get the ipmi or other BMC address of the baremetal node
def get_bmc_addr(self, node_name):
# Addresses in the config get higher priority.
@@ -228,3 +240,104 @@ class bm_node_scenarios(abstract_node_scenarios):
logging.error("node_reboot_scenario injection failed!")
raise e
self.affected_nodes_status.affected_nodes.append(affected_node)
def node_disk_detach_attach_scenario(self, instance_kill_count, node, timeout, duration):
logging.info("Starting disk_detach_attach_scenario injection")
disk_attachment_details = self.get_disk_attachment_info(instance_kill_count, node)
if disk_attachment_details:
self.disk_detach_scenario(instance_kill_count, node, disk_attachment_details, timeout)
logging.info("Waiting for %s seconds before attaching the disk" % (duration))
time.sleep(duration)
self.disk_attach_scenario(instance_kill_count, node, disk_attachment_details)
logging.info("node_disk_detach_attach_scenario has been successfully injected!")
else:
logging.error("Node %s has only root disk attached" % (node))
logging.error("node_disk_detach_attach_scenario failed!")
# Get volume attachment info
def get_disk_attachment_info(self, instance_kill_count, node):
for _ in range(instance_kill_count):
try:
logging.info("Obtaining disk attachment information")
user_disks= self.bm.get_bm_disks(node)
disk_pod_name = f"disk-pod-{get_random_string(5)}"
cmd = '''bootdev=$(lsblk -no PKNAME $(findmnt -no SOURCE /boot));
for path in /sys/block/*/device/state; do
dev=$(basename $(dirname $(dirname "$path")));
[[ "$dev" != "$bootdev" ]] && echo "$dev";
done'''
pod_command = ["chroot /host /bin/sh -c '" + cmd + "'"]
disk_response = self.kubecli.exec_command_on_node(
node, pod_command, disk_pod_name, "default"
)
logging.info("Disk response: %s" % (disk_response))
node_disks = [disk for disk in disk_response.split("\n") if disk]
logging.info("Node disks: %s" % (node_disks))
offline_disks = [disk for disk in user_disks if disk in node_disks]
return offline_disks if offline_disks else node_disks
except Exception as e:
logging.error(
"Failed to obtain disk attachment information of %s node. "
"Encounteres following exception: %s." % (node, e)
)
raise RuntimeError()
finally:
self.kubecli.delete_pod(disk_pod_name, "default")
# Node scenario to detach the volume
def disk_detach_scenario(self, instance_kill_count, node, disk_attachment_details, timeout):
for _ in range(instance_kill_count):
try:
logging.info("Starting disk_detach_scenario injection")
logging.info(
"Detaching the %s disks from instance %s "
% (disk_attachment_details, node)
)
disk_pod_name = f"detach-disk-pod-{get_random_string(5)}"
detach_disk_command=''
for disk in disk_attachment_details:
detach_disk_command = detach_disk_command + "echo offline > /sys/block/" + disk + "/device/state;"
pod_command = ["chroot /host /bin/sh -c '" + detach_disk_command + "'"]
cmd_output = self.kubecli.exec_command_on_node(
node, pod_command, disk_pod_name, "default"
)
logging.info("Disk command output: %s" % (cmd_output))
logging.info("Disk %s has been detached from %s node" % (disk_attachment_details, node))
except Exception as e:
logging.error(
"Failed to detach disk from %s node. Encountered following"
"exception: %s." % (node, e)
)
logging.debug("")
raise RuntimeError()
finally:
self.kubecli.delete_pod(disk_pod_name, "default")
# Node scenario to attach the volume
def disk_attach_scenario(self, instance_kill_count, node, disk_attachment_details):
for _ in range(instance_kill_count):
try:
logging.info(
"Attaching the %s disks from instance %s "
% (disk_attachment_details, node)
)
disk_pod_name = f"attach-disk-pod-{get_random_string(5)}"
attach_disk_command=''
for disk in disk_attachment_details:
attach_disk_command = attach_disk_command + "echo running > /sys/block/" + disk + "/device/state;"
pod_command = ["chroot /host /bin/sh -c '" + attach_disk_command + "'"]
cmd_output = self.kubecli.exec_command_on_node(
node, pod_command, disk_pod_name, "default"
)
logging.info("Disk command output: %s" % (cmd_output))
logging.info("Disk %s has been attached to %s node" % (disk_attachment_details, node))
except Exception as e:
logging.error(
"Failed to attach disk to %s node. Encountered following"
"exception: %s." % (node, e)
)
logging.debug("")
raise RuntimeError()
finally:
self.kubecli.delete_pod(disk_pod_name, "default")

View File

@@ -1,16 +1,10 @@
import datetime
import time
import random
import logging
import paramiko
from krkn_lib.models.k8s import AffectedNode
import krkn.invoke.command as runcommand
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus
from krkn_lib.models.k8s import AffectedNode
node_general = False
def get_node_by_name(node_name_list, kubecli: KrknKubernetes):
killable_nodes = kubecli.list_killable_nodes()
@@ -65,14 +59,6 @@ def wait_for_unknown_status(node, timeout, kubecli: KrknKubernetes, affected_nod
return affected_node
# Get the ip of the cluster node
def get_node_ip(node):
return runcommand.invoke(
"kubectl get node %s -o "
"jsonpath='{.status.addresses[?(@.type==\"InternalIP\")].address}'" % (node)
)
def check_service_status(node, service, ssh_private_key, timeout):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

View File

@@ -36,10 +36,25 @@ class IbmCloud:
self.service = VpcV1(authenticator=authenticator)
self.service.set_service_url(service_url)
except Exception as e:
logging.error("error authenticating" + str(e))
def configure_ssl_verification(self, disable_ssl_verification):
"""
Configure SSL verification for IBM Cloud VPC service.
Args:
disable_ssl_verification: If True, disables SSL verification.
"""
logging.info(f"Configuring SSL verification: disable_ssl_verification={disable_ssl_verification}")
if disable_ssl_verification:
self.service.set_disable_ssl_verification(True)
logging.info("SSL verification disabled for IBM Cloud VPC service")
else:
self.service.set_disable_ssl_verification(False)
logging.info("SSL verification enabled for IBM Cloud VPC service")
# Get the instance ID of the node
def get_instance_id(self, node_name):
node_list = self.list_instances()
@@ -260,9 +275,13 @@ class IbmCloud:
@dataclass
class ibm_node_scenarios(abstract_node_scenarios):
def __init__(self, kubecli: KrknKubernetes, node_action_kube_check: bool, affected_nodes_status: AffectedNodeStatus):
def __init__(self, kubecli: KrknKubernetes, node_action_kube_check: bool, affected_nodes_status: AffectedNodeStatus, disable_ssl_verification: bool):
super().__init__(kubecli, node_action_kube_check, affected_nodes_status)
self.ibmcloud = IbmCloud()
# Configure SSL verification
self.ibmcloud.configure_ssl_verification(disable_ssl_verification)
self.node_action_kube_check = node_action_kube_check
def node_start_scenario(self, instance_kill_count, node, timeout):
@@ -327,7 +346,7 @@ class ibm_node_scenarios(abstract_node_scenarios):
logging.info("Starting node_reboot_scenario injection")
logging.info("Rebooting the node %s " % (node))
self.ibmcloud.reboot_instances(instance_id)
self.ibmcloud.wait_until_rebooted(instance_id, timeout)
self.ibmcloud.wait_until_rebooted(instance_id, timeout, affected_node)
if self.node_action_kube_check:
nodeaction.wait_for_unknown_status(
node, timeout, affected_node

View File

@@ -120,7 +120,8 @@ class NodeActionsScenarioPlugin(AbstractScenarioPlugin):
node_scenario["cloud_type"].lower() == "ibm"
or node_scenario["cloud_type"].lower() == "ibmcloud"
):
return ibm_node_scenarios(kubecli, node_action_kube_check, affected_nodes_status)
disable_ssl_verification = get_yaml_item_value(node_scenario, "disable_ssl_verification", True)
return ibm_node_scenarios(kubecli, node_action_kube_check, affected_nodes_status, disable_ssl_verification)
else:
logging.error(
"Cloud type "

View File

@@ -0,0 +1,21 @@
from dataclasses import dataclass
@dataclass
class InputParams:
def __init__(self, config: dict[str,any] = None):
if config:
self.kill = config["kill"] if "kill" in config else 1
self.timeout = config["timeout"] if "timeout" in config else 120
self.duration = config["duration"] if "duration" in config else 10
self.krkn_pod_recovery_time = config["krkn_pod_recovery_time"] if "krkn_pod_recovery_time" in config else 120
self.label_selector = config["label_selector"] if "label_selector" in config else ""
self.namespace_pattern = config["namespace_pattern"] if "namespace_pattern" in config else ""
self.name_pattern = config["name_pattern"] if "name_pattern" in config else ""
namespace_pattern: str
krkn_pod_recovery_time: int
timeout: int
duration: int
kill: int
label_selector: str
name_pattern: str

View File

@@ -0,0 +1,164 @@
import logging
import random
import time
import yaml
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.k8s.pods_monitor_pool import PodsMonitorPool
from krkn.scenario_plugins.pod_disruption.models.models import InputParams
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_yaml_item_value
from datetime import datetime
from dataclasses import dataclass
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
@dataclass
class Pod:
namespace: str
name: str
creation_timestamp : str
class PodDisruptionScenarioPlugin(AbstractScenarioPlugin):
def run(
self,
run_uuid: str,
scenario: str,
krkn_config: dict[str, any],
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
) -> int:
pool = PodsMonitorPool(lib_telemetry.get_lib_kubernetes())
try:
with open(scenario, "r") as f:
cont_scenario_config = yaml.full_load(f)
for kill_scenario in cont_scenario_config:
kill_scenario_config = InputParams(kill_scenario["config"])
self.start_monitoring(
kill_scenario_config, pool
)
return_status = self.killing_pods(
kill_scenario_config, lib_telemetry.get_lib_kubernetes()
)
if return_status != 0:
result = pool.cancel()
else:
result = pool.join()
if result.error:
logging.error(
logging.error(
f"PodDisruptionScenariosPlugin pods failed to recovery: {result.error}"
)
)
return 1
scenario_telemetry.affected_pods = result
except (RuntimeError, Exception) as e:
logging.error("PodDisruptionScenariosPlugin exiting due to Exception %s" % e)
return 1
else:
return 0
def get_scenario_types(self) -> list[str]:
return ["pod_disruption_scenarios"]
def start_monitoring(self, kill_scenario: InputParams, pool: PodsMonitorPool):
recovery_time = kill_scenario.krkn_pod_recovery_time
if (
kill_scenario.namespace_pattern
and kill_scenario.label_selector
):
namespace_pattern = kill_scenario.namespace_pattern
label_selector = kill_scenario.label_selector
pool.select_and_monitor_by_namespace_pattern_and_label(
namespace_pattern=namespace_pattern,
label_selector=label_selector,
max_timeout=recovery_time,
field_selector="status.phase=Running"
)
logging.info(
f"waiting up to {recovery_time} seconds for pod recovery, "
f"pod label pattern: {label_selector} namespace pattern: {namespace_pattern}"
)
elif (
kill_scenario.namespace_pattern
and kill_scenario.name_pattern
):
namespace_pattern = kill_scenario.namespace_pattern
name_pattern = kill_scenario.name_pattern
pool.select_and_monitor_by_name_pattern_and_namespace_pattern(
pod_name_pattern=name_pattern,
namespace_pattern=namespace_pattern,
max_timeout=recovery_time,
field_selector="status.phase=Running"
)
logging.info(
f"waiting up to {recovery_time} seconds for pod recovery, "
f"pod name pattern: {name_pattern} namespace pattern: {namespace_pattern}"
)
else:
raise Exception(
f"impossible to determine monitor parameters, check {kill_scenario} configuration"
)
def get_pods(self, name_pattern, label_selector,namespace, kubecli: KrknKubernetes, field_selector: str =None):
if label_selector and name_pattern:
logging.error('Only, one of name pattern or label pattern can be specified')
elif label_selector:
pods = kubecli.select_pods_by_namespace_pattern_and_label(label_selector=label_selector,namespace_pattern=namespace, field_selector=field_selector)
elif name_pattern:
pods = kubecli.select_pods_by_name_pattern_and_namespace_pattern(pod_name_pattern=name_pattern, namespace_pattern=namespace, field_selector=field_selector)
else:
logging.error('Name pattern or label pattern must be specified ')
return pods
def killing_pods(self, config: InputParams, kubecli: KrknKubernetes):
# region Select target pods
namespace = config.namespace_pattern
if not namespace:
logging.error('Namespace pattern must be specified')
pods = self.get_pods(config.name_pattern,config.label_selector,config.namespace_pattern, kubecli, field_selector="status.phase=Running")
pods_count = len(pods)
if len(pods) < config.kill:
logging.error("Not enough pods match the criteria, expected {} but found only {} pods".format(
config.kill, len(pods)))
return 1
random.shuffle(pods)
for i in range(config.kill):
pod = pods[i]
logging.info(pod)
logging.info(f'Deleting pod {pod[0]}')
kubecli.delete_pod(pod[0], pod[1])
self.wait_for_pods(config.label_selector,config.name_pattern,config.namespace_pattern, pods_count, config.duration, config.timeout, kubecli)
return 0
def wait_for_pods(
self, label_selector, pod_name, namespace, pod_count, duration, wait_timeout, kubecli: KrknKubernetes
):
timeout = False
start_time = datetime.now()
while not timeout:
pods = self.get_pods(name_pattern=pod_name, label_selector=label_selector,namespace=namespace, field_selector="status.phase=Running", kubecli=kubecli)
if pod_count == len(pods):
return
time.sleep(duration)
now_time = datetime.now()
time_diff = now_time - start_time
if time_diff.seconds > wait_timeout:
logging.error("timeout while waiting for pods to come up")
return 1
return 0

View File

@@ -32,6 +32,7 @@ class ZoneOutageScenarioPlugin(AbstractScenarioPlugin):
zone_outage_config_yaml = yaml.full_load(f)
scenario_config = zone_outage_config_yaml["zone_outage"]
cloud_type = scenario_config["cloud_type"]
kube_check = get_yaml_item_value(scenario_config, "kube_check", True)
start_time = int(time.time())
if cloud_type.lower() == "aws":
self.cloud_object = AWS()
@@ -40,7 +41,7 @@ class ZoneOutageScenarioPlugin(AbstractScenarioPlugin):
kubecli = lib_telemetry.get_lib_kubernetes()
if cloud_type.lower() == "gcp":
affected_nodes_status = AffectedNodeStatus()
self.cloud_object = gcp_node_scenarios(kubecli, affected_nodes_status)
self.cloud_object = gcp_node_scenarios(kubecli, kube_check, affected_nodes_status)
self.node_based_zone(scenario_config, kubecli)
affected_nodes_status = self.cloud_object.affected_nodes_status
scenario_telemetry.affected_nodes.extend(affected_nodes_status.affected_nodes)

View File

@@ -16,7 +16,7 @@ google-cloud-compute==1.22.0
ibm_cloud_sdk_core==3.18.0
ibm_vpc==0.20.0
jinja2==3.1.6
krkn-lib==5.0.2
krkn-lib==5.1.0
lxml==5.1.0
kubernetes==28.1.0
numpy==1.26.4
@@ -28,7 +28,7 @@ pyfiglet==1.0.2
pytest==8.0.0
python-ipmi==0.5.4
python-openstackclient==6.5.0
requests==2.32.2
requests==2.32.4
service_identity==24.1.0
PyYAML==6.0.1
setuptools==78.1.1
@@ -36,7 +36,6 @@ werkzeug==3.0.6
wheel==0.42.0
zope.interface==5.4.0
git+https://github.com/krkn-chaos/arcaflow-plugin-kill-pod.git@v0.1.0
git+https://github.com/vmware/vsphere-automation-sdk-python.git@v8.0.0.0
cryptography>=42.0.4 # not directly required, pinned by Snyk to avoid a vulnerability
protobuf>=4.25.8 # not directly required, pinned by Snyk to avoid a vulnerability

View File

@@ -16,7 +16,6 @@ from krkn_lib.elastic.krkn_elastic import KrknElastic
from krkn_lib.models.elastic import ElasticChaosRunTelemetry
from krkn_lib.models.krkn import ChaosRunOutput, ChaosRunAlertSummary
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
import krkn.performance_dashboards.setup as performance_dashboards
import krkn.prometheus as prometheus_plugin
import server as server
from krkn_lib.k8s import KrknKubernetes
@@ -69,14 +68,6 @@ def main(cfg) -> int:
wait_duration = get_yaml_item_value(config["tunings"], "wait_duration", 60)
iterations = get_yaml_item_value(config["tunings"], "iterations", 1)
daemon_mode = get_yaml_item_value(config["tunings"], "daemon_mode", False)
deploy_performance_dashboards = get_yaml_item_value(
config["performance_monitoring"], "deploy_dashboards", False
)
dashboard_repo = get_yaml_item_value(
config["performance_monitoring"],
"repo",
"https://github.com/cloud-bulldozer/performance-dashboards.git",
)
prometheus_url = config["performance_monitoring"].get("prometheus_url")
prometheus_bearer_token = config["performance_monitoring"].get(
@@ -240,10 +231,6 @@ def main(cfg) -> int:
logging.info("Server URL: %s" % kubecli.get_host())
# Deploy performance dashboards
if deploy_performance_dashboards:
performance_dashboards.setup(dashboard_repo, distribution)
# Initialize the start iteration to 0
iteration = 0

5
scenarios/kind/pod_etcd.yml Executable file
View File

@@ -0,0 +1,5 @@
- id: kill-pods
config:
namespace_pattern: "kube-system"
label_selector: "component=etcd"
krkn_pod_recovery_time: 120

View File

@@ -5,6 +5,7 @@ image: quay.io/krkn-chaos/krkn-hog
namespace: default
cpu-load-percentage: 90
cpu-method: all
# node-name: "worker-0" # Uncomment to target a specific node by name
node-selector: "node-role.kubernetes.io/worker="
number-of-nodes: 2
taints: [] #example ["node-role.kubernetes.io/master:NoSchedule"]

View File

@@ -6,10 +6,11 @@ namespace: default
io-block-size: 1m
io-write-bytes: 1g
io-target-pod-folder: /hog-data
# node-name: "worker-0" # Uncomment to target a specific node by name
io-target-pod-volume:
name: node-volume
hostPath:
path: /root # a path writable by kubelet in the root filesystem of the node
node-selector: "node-role.kubernetes.io/worker="
number-of-nodes: ''
taints: [] #example ["node-role.kubernetes.io/master:NoSchedule"]
taints: [] #example ["node-role.kubernetes.io/master:NoSchedule"]

View File

@@ -4,6 +4,7 @@ hog-type: memory
image: quay.io/krkn-chaos/krkn-hog
namespace: default
memory-vm-bytes: 90%
# node-name: "worker-0" # Uncomment to target a specific node by name
node-selector: "node-role.kubernetes.io/worker="
number-of-nodes: ''
taints: [] #example ["node-role.kubernetes.io/master:NoSchedule"]

View File

@@ -1,13 +0,0 @@
- id: node_network_filter
wait_duration: 300
test_duration: 100
label_selector: "kubernetes.io/hostname=ip-10-0-39-182.us-east-2.compute.internal"
namespace: 'default'
instance_count: 1
execution: parallel
ingress: false
egress: true
target: node
interfaces: []
ports:
- 2049

View File

@@ -0,0 +1,18 @@
- id: node_network_filter
image: "quay.io/krkn-chaos/krkn-network-chaos:latest"
wait_duration: 1
test_duration: 10
label_selector: "<node_selector>"
service_account: ""
taints: [] # example ["node-role.kubernetes.io/master:NoSchedule"]
namespace: 'default'
instance_count: 1
execution: parallel
ingress: false
egress: true
target: '<node_name>'
interfaces: []
ports:
- 2309
protocols:
- tcp

View File

@@ -0,0 +1,19 @@
- id: pod_network_filter
image: "quay.io/krkn-chaos/krkn-network-chaos:latest"
wait_duration: 1
test_duration: 60
label_selector: "<pod_selector>"
service_account: ""
taints: [] # example ["node-role.kubernetes.io/master:NoSchedule"]
namespace: 'default'
instance_count: 1
execution: parallel
ingress: false
egress: true
target: "<pod_name>"
interfaces: []
protocols:
- tcp
- udp
ports:
- 53

View File

@@ -0,0 +1,7 @@
scenarios:
- name: "kubevirt outage test"
scenario: kubevirt_vm_outage
parameters:
vm_name: <vm-name>
namespace: <namespace>
timeout: 60

View File

@@ -1,19 +1,33 @@
node_scenarios:
- actions: # Node chaos scenarios to be injected.
- node_stop_start_scenario
node_name: # Node on which scenario has to be injected.
- actions: # Node chaos scenarios to be injected
- node_stop_start_scenario # Action to run. Supported actions: node_stop, node_restart, node_stop_start etc. Please refer documentation
node_name: # Node(s) on which scenario has to be injected separated by a comma
label_selector: node-role.kubernetes.io/worker # When node_name is not specified, a node with matching label_selector is selected for node chaos scenario injection.
instance_count: 1 # Number of nodes to perform action/select that match the label selector.
instance_count: 1 # Number of nodes to perform action/select that match the label selector
runs: 1 # Number of times to inject each scenario under actions (will perform on same node each time).
timeout: 360 # Duration to wait for completion of node scenario injection.
duration: 120 # Duration to stop the node before running the start action
parallel: False # Run action on label or node name in parallel or sequential, set to true for parallel
cloud_type: bm # Cloud type on which Kubernetes/OpenShift runs.
bmc_user: defaultuser # For baremetal (bm) cloud type. The default IPMI username. Optional if specified for all machines.
bmc_password: defaultpass # For baremetal (bm) cloud type. The default IPMI password. Optional if specified for all machines.
kube_check: True # Run the kubernetes api calls to see if the node gets to a certain state during the node scenario
bmc_user: defaultuser # For baremetal (bm) cloud type. The default IPMI username. Optional if specified for all machines
bmc_password: defaultpass # For baremetal (bm) cloud type. The default IPMI password. Optional if specified for all machines
bmc_info: # This section is here to specify baremetal per-machine info, so it is optional if there is no per-machine info.
node-1: # The node name for the baremetal machine
bmc_addr: mgmt-machine1.example.com # Optional. For baremetal nodes with the IPMI BMC address missing from 'oc get bmh'.
bmc_addr: mgmt-machine1.example.com # Optional. For baremetal nodes with the IPMI BMC address missing from 'oc get bmh'
node-2:
bmc_addr: mgmt-machine2.example.com
bmc_user: user # The baremetal IPMI user. Overrides the default IPMI user specified above. Optional if the default is set.
bmc_user: user # The baremetal IPMI user. Overrides the default IPMI user specified above. Optional if the default is set
bmc_password: pass # The baremetal IPMI password. Overrides the default IPMI user specified above. Optional if the default is set
- actions:
- node_disk_detach_attach_scenario
node_name: node-1
instance_count: 1
runs: 1
timeout: 360
duration: 120
parallel: False
cloud_type: bm
bmc_info:
node-1:
disks: ["sda", "sdb"] # List of disk devices to be used for disk detach/attach scenarios

View File

@@ -7,10 +7,12 @@ node_scenarios:
timeout: 360
duration: 120
cloud_type: ibm
disable_ssl_verification: true # Set to true for CI environments with certificate issues
- actions:
- node_reboot_scenario
node_name:
label_selector: node-role.kubernetes.io/worker
instance_count: 1
timeout: 120
cloud_type: ibm
cloud_type: ibm
disable_ssl_verification: true # Set to true for CI environments with certificate issues

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,215 @@
import unittest
import time
from unittest.mock import MagicMock, patch
import yaml
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.kubevirt_vm_outage.kubevirt_vm_outage_scenario_plugin import KubevirtVmOutageScenarioPlugin
class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for KubevirtVmOutageScenarioPlugin
"""
self.plugin = KubevirtVmOutageScenarioPlugin()
# Create mock k8s client
self.k8s_client = MagicMock()
self.custom_object_client = MagicMock()
self.k8s_client.custom_object_client = self.custom_object_client
self.plugin.k8s_client = self.k8s_client
# Mock methods needed for KubeVirt operations
self.k8s_client.list_custom_resource_definition = MagicMock()
# Mock custom resource definition list with KubeVirt CRDs
crd_list = MagicMock()
crd_item = MagicMock()
crd_item.spec = MagicMock()
crd_item.spec.group = "kubevirt.io"
crd_list.items = [crd_item]
self.k8s_client.list_custom_resource_definition.return_value = crd_list
# Mock VMI data
self.mock_vmi = {
"metadata": {
"name": "test-vm",
"namespace": "default"
},
"status": {
"phase": "Running"
}
}
# Create test config
self.config = {
"scenarios": [
{
"name": "kubevirt outage test",
"scenario": "kubevirt_vm_outage",
"parameters": {
"vm_name": "test-vm",
"namespace": "default",
"duration": 0
}
}
]
}
# Create a temporary config file
import tempfile, os
temp_dir = tempfile.gettempdir()
self.scenario_file = os.path.join(temp_dir, "test_kubevirt_scenario.yaml")
with open(self.scenario_file, "w") as f:
yaml.dump(self.config, f)
# Mock dependencies
self.telemetry = MagicMock(spec=KrknTelemetryOpenshift)
self.scenario_telemetry = MagicMock(spec=ScenarioTelemetry)
self.telemetry.get_lib_kubernetes.return_value = self.k8s_client
def test_successful_injection_and_recovery(self):
"""
Test successful deletion and recovery of a VMI
"""
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock inject and recover to simulate success
with patch.object(self.plugin, 'inject', return_value=0) as mock_inject:
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
mock_inject.assert_called_once_with("test-vm", "default", False)
mock_recover.assert_called_once_with("test-vm", "default", False)
def test_injection_failure(self):
"""
Test failure during VMI deletion
"""
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock inject to simulate failure
with patch.object(self.plugin, 'inject', return_value=1) as mock_inject:
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 1)
mock_inject.assert_called_once_with("test-vm", "default", False)
mock_recover.assert_not_called()
def test_disable_auto_restart(self):
"""
Test VM auto-restart can be disabled
"""
# Configure test with disable_auto_restart=True
self.config["scenarios"][0]["parameters"]["disable_auto_restart"] = True
# Mock VM object for patching
mock_vm = {
"metadata": {"name": "test-vm", "namespace": "default"},
"spec": {}
}
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock VM patch operation
with patch.object(self.plugin, 'patch_vm_spec') as mock_patch_vm:
mock_patch_vm.return_value = True
# Mock inject and recover
with patch.object(self.plugin, 'inject', return_value=0) as mock_inject:
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
# Should call patch_vm_spec to disable auto-restart
mock_patch_vm.assert_any_call("test-vm", "default", False)
# Should call patch_vm_spec to re-enable auto-restart during recovery
mock_patch_vm.assert_any_call("test-vm", "default", True)
mock_inject.assert_called_once_with("test-vm", "default", True)
mock_recover.assert_called_once_with("test-vm", "default", True)
def test_recovery_when_vmi_does_not_exist(self):
"""
Test recovery logic when VMI does not exist after deletion
"""
# Store the original VMI in the plugin for recovery
self.plugin.original_vmi = self.mock_vmi.copy()
# Create a cleaned vmi_dict as the plugin would
vmi_dict = self.mock_vmi.copy()
# Set up running VMI data for after recovery
running_vmi = {
"metadata": {"name": "test-vm", "namespace": "default"},
"status": {"phase": "Running"}
}
# Set up time.time to immediately exceed the timeout for auto-recovery
with patch('time.time', side_effect=[0, 301, 301, 301, 301, 310, 320]):
# Mock get_vmi to always return None (not auto-recovered)
with patch.object(self.plugin, 'get_vmi', side_effect=[None, None, running_vmi]):
# Mock the custom object API to return success
self.custom_object_client.create_namespaced_custom_object = MagicMock(return_value=running_vmi)
# Run recovery with mocked time.sleep
with patch('time.sleep'):
result = self.plugin.recover("test-vm", "default", False)
self.assertEqual(result, 0)
# Verify create was called with the right arguments for our API version and kind
self.custom_object_client.create_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
body=vmi_dict
)
def test_validation_failure(self):
"""
Test validation failure when KubeVirt is not installed
"""
# Mock empty CRD list (no KubeVirt CRDs)
empty_crd_list = MagicMock()
empty_crd_list.items = []
self.k8s_client.list_custom_resource_definition.return_value = empty_crd_list
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 1)
def test_delete_vmi_timeout(self):
"""
Test timeout during VMI deletion
"""
# Mock successful delete operation
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
# Mock that get_vmi always returns VMI (never gets deleted)
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Simulate timeout by making time.time return values that exceed the timeout
with patch('time.sleep'), patch('time.time', side_effect=[0, 10, 20, 130, 130, 130, 130, 140]):
result = self.plugin.inject("test-vm", "default", False)
self.assertEqual(result, 1)
self.custom_object_client.delete_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
name="test-vm"
)
if __name__ == "__main__":
unittest.main()