Compare commits

...

15 Commits

Author SHA1 Message Date
Tullio Sebastiani
fff675f3dd added service account to Network Chaos NG workload (#870)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 8m56s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-07-23 10:17:50 +02:00
Naga Ravi Chaitanya Elluri
c125e5acf7 Update network scenario image
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m34s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
This commit updates fedora tools image reference used by the network scenarios
to the one hosted in the krkn-chaos quay org. This also fixes the issues with
RHACS flagging runs when using latest tag by using tools tag instead.

Signed-off-by: Naga Ravi Chaitanya Elluri <nelluri@redhat.com>
2025-07-22 14:29:00 -04:00
Naga Ravi Chaitanya Elluri
ca6995a1a1 [Snyk] Fix for 3 vulnerabilities (#859)
* fix: requirements.txt to reduce vulnerabilities


The following vulnerabilities are fixed by pinning transitive dependencies:
- https://snyk.io/vuln/SNYK-PYTHON-PROTOBUF-10364902
- https://snyk.io/vuln/SNYK-PYTHON-URLLIB3-10390193
- https://snyk.io/vuln/SNYK-PYTHON-URLLIB3-10390194

* partial vulnerability fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
Co-authored-by: snyk-bot <snyk-bot@snyk.io>
Co-authored-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-07-22 16:50:31 +02:00
Sahil Shah
50cf91ac9e Disable SSL verification for IBM node scenarios and fix node reboot s… (#861)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m9s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* Disable SSL verification for IBM node scenarios and fix node reboot scenario

Signed-off-by: Sahil Shah <sahshah@redhat.com>

* adding disable ssl as a scenario parameter for ibmcloud

Signed-off-by: Sahil Shah <sahshah@redhat.com>

---------

Signed-off-by: Sahil Shah <sahshah@redhat.com>
2025-07-16 12:48:45 -04:00
Tullio Sebastiani
11069c6982 added tolerations to node network filter pod deployment (#867) 2025-07-16 17:11:46 +02:00
Charles Uneze
106d9bf1ae A working kind config (#866)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 5m13s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Charles Uneze <charlesniklaus@gmail.com>
2025-07-15 10:25:01 -04:00
Abhinav Sharma
17f832637c feat: add optional node-name field to hog scenarios with precedence over node-selector (#831)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 8m31s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Co-authored-by: Paige Patton <64206430+paigerube14@users.noreply.github.com>
2025-07-11 14:10:16 -04:00
Paige Patton
0e5c8c55a4 adding details of node for hog failure
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 8m23s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-10 16:49:28 -04:00
Tullio Sebastiani
9d9a6f9b80 added missing parameters to node-network-filter + added default values
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-07-10 13:22:50 -04:00
Anshuman Panda
f8fe2ae5b7 Refactor: to use krkn-lib for getting and remove invoke funct. usage node IP
Signed-off-by: Anshuman Panda <ichuk0078@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-10 13:21:10 -04:00
Paige Patton
77b1dd32c7 adding kubevirt with pod timing
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-10 13:19:37 -04:00
Anshuman Panda
9df727ccf5 Ensure metrics are always saved with improved local fallback
Signed-off-by: Anshuman Panda <ichuk0078@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-10 13:19:07 -04:00
Tullio Sebastiani
70c8fec705 added pod-network-filter funtest (#863)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m37s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* added pod-network-filter funtest

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* updated kind settings

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-07-10 09:35:59 +02:00
Abhinav Sharma
0731144a6b Add support for triggering kubevirt VM outages (#816)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 10m2s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* add requirement for kubevirt_vm_outage

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* add initial init and kubevirt_plugin files

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* add scenario in  kubevirt-vm-outage.yaml

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* implement init, get_scenario_types, run and placeholder for inject and recover functions

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* implement init client, execute_scenario, validate environment, inject and get_VMinstance fucntions

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* implement recover function

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* implement recover function

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* add test for kubevirt_vm_outage feature

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* improve KubeVirt recovery logic and update dependencies, for kubevirt

Signed-off-by: Paige Patton <prubenda@redhat.com>

* refactor(kubevirt): use KrknKubernetes client for KubeVirt operations

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* chore: Add auto-restart disable option and simplify code

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* chore: remove kubevirt external package used.

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* adding few changes and scenario in config file

Signed-off-by: Paige Patton <prubenda@redhat.com>

* removing docs

Signed-off-by: Paige Patton <prubenda@redhat.com>

* no affected pods

Signed-off-by: Paige Patton <prubenda@redhat.com>

---------

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
Co-authored-by: Paige Patton <prubenda@redhat.com>
2025-07-08 14:04:57 -04:00
yogananth subramanian
9337052e7b Fix bm_node_scenarios.py
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 10m29s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Fix the logic in disk disruption scenario, which was returning the right set of disks to be off-lined.

Signed-off-by: Yogananth Subramanian <ysubrama@redhat.com>
2025-07-07 13:49:33 -04:00
39 changed files with 873 additions and 85 deletions

View File

@@ -21,27 +21,7 @@ jobs:
helm repo add stable https://charts.helm.sh/stable
helm repo update
- name: Deploy prometheus & Port Forwarding
run: |
kubectl create namespace prometheus-k8s
helm install \
--wait --timeout 360s \
kind-prometheus \
prometheus-community/kube-prometheus-stack \
--namespace prometheus-k8s \
--set prometheus.service.nodePort=30000 \
--set prometheus.service.type=NodePort \
--set grafana.service.nodePort=31000 \
--set grafana.service.type=NodePort \
--set alertmanager.service.nodePort=32000 \
--set alertmanager.service.type=NodePort \
--set prometheus-node-exporter.service.nodePort=32001 \
--set prometheus-node-exporter.service.type=NodePort \
--set prometheus.prometheusSpec.maximumStartupDurationSeconds=300
SELECTOR=`kubectl -n prometheus-k8s get service kind-prometheus-kube-prome-prometheus -o wide --no-headers=true | awk '{ print $7 }'`
POD_NAME=`kubectl -n prometheus-k8s get pods --selector="$SELECTOR" --no-headers=true | awk '{ print $1 }'`
kubectl -n prometheus-k8s port-forward $POD_NAME 9090:9090 &
sleep 5
uses: redhat-chaos/actions/prometheus@main
- name: Install Python
uses: actions/setup-python@v4
with:
@@ -89,6 +69,7 @@ jobs:
echo "test_cpu_hog" >> ./CI/tests/functional_tests
echo "test_memory_hog" >> ./CI/tests/functional_tests
echo "test_io_hog" >> ./CI/tests/functional_tests
echo "test_pod_network_filter" >> ./CI/tests/functional_tests
# Push on main only steps + all other functional to collect coverage
@@ -119,6 +100,7 @@ jobs:
echo "test_cpu_hog" >> ./CI/tests/functional_tests
echo "test_memory_hog" >> ./CI/tests/functional_tests
echo "test_io_hog" >> ./CI/tests/functional_tests
echo "test_pod_network_filter" >> ./CI/tests/functional_tests
# Final common steps
- name: Run Functional tests

View File

@@ -8,9 +8,9 @@ spec:
hostNetwork: true
containers:
- name: fedtools
image: docker.io/fedora/tools
image: quay.io/krkn-chaos/krkn:tools
command:
- /bin/sh
- -c
- |
sleep infinity
sleep infinity

View File

@@ -0,0 +1,29 @@
apiVersion: v1
kind: Pod
metadata:
name: pod-network-filter-test
labels:
app.kubernetes.io/name: pod-network-filter
spec:
containers:
- name: nginx
image: quay.io/krkn-chaos/krkn-funtests:pod-network-filter
ports:
- containerPort: 5000
name: pod-network-prt
---
apiVersion: v1
kind: Service
metadata:
name: pod-network-filter-service
spec:
selector:
app.kubernetes.io/name: pod-network-filter
type: NodePort
ports:
- name: pod-network-filter-svc
protocol: TCP
port: 80
targetPort: pod-network-prt
nodePort: 30037

View File

@@ -8,9 +8,9 @@ spec:
hostNetwork: true
containers:
- name: fedtools
image: docker.io/fedora/tools
image: quay.io/krkn-chaos/krkn:tools
command:
- /bin/sh
- -c
- |
sleep infinity
sleep infinity

View File

@@ -7,7 +7,7 @@ trap finish EXIT
function functional_test_cpu_hog {
yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/cpu-hog.yml
yq -i '."node-selector"="kubernetes.io/hostname=kind-worker2"' scenarios/kube/cpu-hog.yml
export scenario_type="hog_scenarios"
export scenario_file="scenarios/kube/cpu-hog.yml"

View File

@@ -5,12 +5,13 @@ source CI/tests/common.sh
trap error ERR
trap finish EXIT
function functional_test_io_hog {
yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/io-hog.yml
yq -i '."node-selector"="kubernetes.io/hostname=kind-worker2"' scenarios/kube/io-hog.yml
export scenario_type="hog_scenarios"
export scenario_file="scenarios/kube/io-hog.yml"
export post_config=""
cat $scenario_file
envsubst < CI/config/common_test_config.yaml > CI/config/io_hog.yaml
python3 -m coverage run -a run_kraken.py -c CI/config/io_hog.yaml
echo "IO Hog: Success"

View File

@@ -7,7 +7,7 @@ trap finish EXIT
function functional_test_memory_hog {
yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/memory-hog.yml
yq -i '."node-selector"="kubernetes.io/hostname=kind-worker2"' scenarios/kube/memory-hog.yml
export scenario_type="hog_scenarios"
export scenario_file="scenarios/kube/memory-hog.yml"
export post_config=""

View File

@@ -0,0 +1,59 @@
function functional_pod_network_filter {
export SERVICE_URL="http://localhost:8889"
export scenario_type="network_chaos_ng_scenarios"
export scenario_file="scenarios/kube/pod-network-filter.yml"
export post_config=""
envsubst < CI/config/common_test_config.yaml > CI/config/pod_network_filter.yaml
yq -i '.[0].test_duration=10' scenarios/kube/pod-network-filter.yml
yq -i '.[0].label_selector=""' scenarios/kube/pod-network-filter.yml
yq -i '.[0].ingress=false' scenarios/kube/pod-network-filter.yml
yq -i '.[0].egress=true' scenarios/kube/pod-network-filter.yml
yq -i '.[0].target="pod-network-filter-test"' scenarios/kube/pod-network-filter.yml
yq -i '.[0].protocols=["tcp"]' scenarios/kube/pod-network-filter.yml
yq -i '.[0].ports=[443]' scenarios/kube/pod-network-filter.yml
## Test webservice deployment
kubectl apply -f ./CI/templates/pod_network_filter.yaml
COUNTER=0
while true
do
curl $SERVICE_URL
EXITSTATUS=$?
if [ "$EXITSTATUS" -eq "0" ]
then
break
fi
sleep 1
COUNTER=$((COUNTER+1))
[ $COUNTER -eq "100" ] && echo "maximum number of retry reached, test failed" && exit 1
done
python3 -m coverage run -a run_kraken.py -c CI/config/pod_network_filter.yaml > /dev/null 2>&1 &
PID=$!
# wait until the dns resolution starts failing and the service returns 400
DNS_FAILURE_STATUS=0
while true
do
OUT_STATUS_CODE=$(curl -X GET -s -o /dev/null -I -w "%{http_code}" $SERVICE_URL)
if [ "$OUT_STATUS_CODE" -eq "404" ]
then
DNS_FAILURE_STATUS=404
fi
if [ "$DNS_FAILURE_STATUS" -eq "404" ] && [ "$OUT_STATUS_CODE" -eq "200" ]
then
echo "service restored"
break
fi
COUNTER=$((COUNTER+1))
[ $COUNTER -eq "100" ] && echo "maximum number of retry reached, test failed" && exit 1
sleep 2
done
wait $PID
}
functional_pod_network_filter

View File

@@ -46,7 +46,10 @@ kraken:
- syn_flood_scenarios:
- scenarios/kube/syn_flood.yaml
- network_chaos_ng_scenarios:
- scenarios/kube/network-filter.yml
- scenarios/kube/pod-network-filter.yml
- scenarios/kube/node-network-filter.yml
- kubevirt_vm_outage:
- scenarios/kubevirt/kubevirt-vm-outage.yaml
cerberus:
cerberus_enabled: False # Enable it when cerberus is previously installed

View File

@@ -7,10 +7,8 @@ kraken:
signal_state: RUN # Will wait for the RUN signal when set to PAUSE before running the scenarios, refer docs/signal.md for more details
signal_address: 0.0.0.0 # Signal listening address
chaos_scenarios: # List of policies/chaos scenarios to load
- plugin_scenarios:
- scenarios/kind/scheduler.yml
- node_scenarios:
- scenarios/kind/node_scenarios_example.yml
- pod_disruption_scenarios:
- scenarios/kube/pod.yml
cerberus:
cerberus_enabled: False # Enable it when cerberus is previously installed
@@ -26,7 +24,18 @@ performance_monitoring:
enable_alerts: False # Runs the queries specified in the alert profile and displays the info or exits 1 when severity=error
alert_profile: config/alerts.yaml # Path to alert profile with the prometheus queries
elastic:
enable_elastic: False
tunings:
wait_duration: 60 # Duration to wait between each chaos scenario
iterations: 1 # Number of times to execute the scenarios
daemon_mode: False # Iterations are set to infinity which means that the kraken will cause chaos forever
telemetry:
enabled: False # enable/disables the telemetry collection feature
archive_path: /tmp # local path where the archive files will be temporarly stored
events_backup: False # enables/disables cluster events collection
logs_backup: False
health_checks: # Utilizing health check endpoints to observe application behavior during chaos injection.

View File

@@ -10,7 +10,7 @@ RUN go mod edit -go 1.23.1 &&\
go get github.com/docker/docker@v25.0.6&&\
go get github.com/opencontainers/runc@v1.1.14&&\
go get github.com/go-git/go-git/v5@v5.13.0&&\
go get golang.org/x/net@v0.36.0&&\
go get golang.org/x/net@v0.38.0&&\
go get github.com/containerd/containerd@v1.7.27&&\
go get golang.org/x/oauth2@v0.27.0&&\
go get golang.org/x/crypto@v0.35.0&&\
@@ -47,7 +47,7 @@ RUN if [ -n "$PR_NUMBER" ]; then git fetch origin pull/${PR_NUMBER}/head:pr-${PR
RUN if [ -n "$TAG" ]; then git checkout "$TAG";fi
RUN python3.9 -m ensurepip --upgrade --default-pip
RUN python3.9 -m pip install --upgrade pip setuptools==70.0.0
RUN python3.9 -m pip install --upgrade pip setuptools==78.1.1
RUN pip3.9 install -r requirements.txt
RUN pip3.9 install jsonschema

View File

@@ -5,6 +5,8 @@ nodes:
extraPortMappings:
- containerPort: 30036
hostPort: 8888
- containerPort: 30037
hostPort: 8889
- role: control-plane
- role: control-plane
- role: worker

View File

@@ -9,6 +9,7 @@ import logging
import urllib3
import sys
import json
import tempfile
import yaml
from krkn_lib.elastic.krkn_elastic import KrknElastic
@@ -251,11 +252,29 @@ def metrics(
metric[k] = v
metric['timestamp'] = str(datetime.datetime.now())
metrics_list.append(metric.copy())
if elastic:
save_metrics = False
if elastic is not None and elastic_metrics_index is not None:
result = elastic.upload_metrics_to_elasticsearch(
run_uuid=run_uuid, index=elastic_metrics_index, raw_data=metrics_list
)
if result == -1:
logging.error("failed to save metrics on ElasticSearch")
save_metrics = True
else:
save_metrics = True
if save_metrics:
local_dir = os.path.join(tempfile.gettempdir(), "krkn_metrics")
os.makedirs(local_dir, exist_ok=True)
local_file = os.path.join(local_dir, f"{elastic_metrics_index}_{run_uuid}.json")
try:
with open(local_file, "w") as f:
json.dump({
"run_uuid": run_uuid,
"metrics": metrics_list
}, f, indent=2)
logging.info(f"Metrics saved to {local_file}")
except Exception as e:
logging.error(f"Failed to save metrics to {local_file}: {e}")
return metrics_list

View File

@@ -25,6 +25,10 @@ class HogsScenarioPlugin(AbstractScenarioPlugin):
with open(scenario, "r") as f:
scenario = yaml.full_load(f)
scenario_config = HogConfig.from_yaml_dict(scenario)
# Get node-name if provided
node_name = scenario.get('node-name')
has_selector = True
if not scenario_config.node_selector or not re.match("^.+=.*$", scenario_config.node_selector):
if scenario_config.node_selector:
@@ -33,13 +37,19 @@ class HogsScenarioPlugin(AbstractScenarioPlugin):
else:
node_selector = scenario_config.node_selector
available_nodes = lib_telemetry.get_lib_kubernetes().list_nodes(node_selector)
if len(available_nodes) == 0:
raise Exception("no available nodes to schedule workload")
if node_name:
logging.info(f"Using specific node: {node_name}")
all_nodes = lib_telemetry.get_lib_kubernetes().list_nodes("")
if node_name not in all_nodes:
raise Exception(f"Specified node {node_name} not found or not available")
available_nodes = [node_name]
else:
available_nodes = lib_telemetry.get_lib_kubernetes().list_nodes(node_selector)
if len(available_nodes) == 0:
raise Exception("no available nodes to schedule workload")
if not has_selector:
# if selector not specified picks a random node between the available
available_nodes = [available_nodes[random.randint(0, len(available_nodes))]]
if not has_selector:
available_nodes = [available_nodes[random.randint(0, len(available_nodes))]]
if scenario_config.number_of_nodes and len(available_nodes) > scenario_config.number_of_nodes:
available_nodes = random.sample(available_nodes, scenario_config.number_of_nodes)

View File

@@ -0,0 +1,399 @@
import logging
import time
from typing import Dict, Any, Optional
import random
import re
import yaml
from kubernetes.client.rest import ApiException
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import log_exception
from krkn_lib.models.k8s import AffectedPod, PodsStatus
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
"""
A scenario plugin that injects chaos by deleting a KubeVirt Virtual Machine Instance (VMI).
This plugin simulates a VM crash or outage scenario and supports automated or manual recovery.
"""
def __init__(self):
self.k8s_client = None
self.original_vmi = None
# Scenario type is handled directly in execute_scenario
def get_scenario_types(self) -> list[str]:
return ["kubevirt_vm_outage"]
def run(
self,
run_uuid: str,
scenario: str,
krkn_config: dict[str, any],
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
) -> int:
"""
Main entry point for the plugin.
Parses the scenario configuration and executes the chaos scenario.
"""
try:
with open(scenario, "r") as f:
scenario_config = yaml.full_load(f)
self.init_clients(lib_telemetry.get_lib_kubernetes())
pods_status = PodsStatus()
for config in scenario_config["scenarios"]:
if config.get("scenario") == "kubevirt_vm_outage":
single_pods_status = self.execute_scenario(config, scenario_telemetry)
pods_status.merge(single_pods_status)
scenario_telemetry.affected_pods = pods_status
return 0
except Exception as e:
logging.error(f"KubeVirt VM Outage scenario failed: {e}")
log_exception(e)
return 1
def init_clients(self, k8s_client: KrknKubernetes):
"""
Initialize Kubernetes client for KubeVirt operations.
"""
self.k8s_client = k8s_client
self.custom_object_client = k8s_client.custom_object_client
logging.info("Successfully initialized Kubernetes client for KubeVirt operations")
def get_vmi(self, name: str, namespace: str) -> Optional[Dict]:
"""
Get a Virtual Machine Instance by name and namespace.
:param name: Name of the VMI to retrieve
:param namespace: Namespace of the VMI
:return: The VMI object if found, None otherwise
"""
try:
vmi = self.custom_object_client.get_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
name=name
)
return vmi
except ApiException as e:
if e.status == 404:
logging.warning(f"VMI {name} not found in namespace {namespace}")
return None
else:
logging.error(f"Error getting VMI {name}: {e}")
raise
except Exception as e:
logging.error(f"Unexpected error getting VMI {name}: {e}")
raise
def get_vmis(self, regex_name: str, namespace: str) -> Optional[Dict]:
"""
Get a Virtual Machine Instance by name and namespace.
:param name: Name of the VMI to retrieve
:param namespace: Namespace of the VMI
:return: The VMI object if found, None otherwise
"""
try:
vmis = self.custom_object_client.list_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
)
vmi_list = []
for vmi in vmis.get("items"):
vmi_name = vmi.get("metadata",{}).get("name")
match = re.match(regex_name, vmi_name)
if match:
vmi_list.append(vmi)
return vmi_list
except ApiException as e:
if e.status == 404:
logging.warning(f"VMI {regex_name} not found in namespace {namespace}")
return None
else:
logging.error(f"Error getting VMI {regex_name}: {e}")
raise
except Exception as e:
logging.error(f"Unexpected error getting VMI {regex_name}: {e}")
raise
def execute_scenario(self, config: Dict[str, Any], scenario_telemetry: ScenarioTelemetry) -> int:
"""
Execute a KubeVirt VM outage scenario based on the provided configuration.
:param config: The scenario configuration
:param scenario_telemetry: The telemetry object for recording metrics
:return: 0 for success, 1 for failure
"""
try:
params = config.get("parameters", {})
vm_name = params.get("vm_name")
namespace = params.get("namespace", "default")
timeout = params.get("timeout", 60)
kill_count = params.get("kill_count", 1)
disable_auto_restart = params.get("disable_auto_restart", False)
self.pods_status = PodsStatus()
if not vm_name:
logging.error("vm_name parameter is required")
return 1
vmis_list = self.get_vmis(vm_name,namespace)
rand_int = random.randint(0, len(vmis_list) - 1)
vmi = vmis_list[rand_int]
logging.info(f"Starting KubeVirt VM outage scenario for VM: {vm_name} in namespace: {namespace}")
vmi_name = vmi.get("metadata").get("name")
if not self.validate_environment(vmi_name, namespace):
return 1
vmi = self.get_vmi(vmi_name, namespace)
self.affected_pod = AffectedPod(
pod_name=vmi_name,
namespace=namespace,
)
if not vmi:
logging.error(f"VMI {vm_name} not found in namespace {namespace}")
return 1
self.original_vmi = vmi
logging.info(f"Captured initial state of VMI: {vm_name}")
result = self.delete_vmi(vmi_name, namespace, disable_auto_restart)
if result != 0:
return self.pods_status
result = self.wait_for_running(vmi_name,namespace, timeout)
if result != 0:
self.recover(vmi_name, namespace)
self.pods_status.unrecovered = self.affected_pod
return self.pods_status
self.affected_pod.total_recovery_time = (
self.affected_pod.pod_readiness_time
+ self.affected_pod.pod_rescheduling_time
)
self.pods_status.recovered.append(self.affected_pod)
logging.info(f"Successfully completed KubeVirt VM outage scenario for VM: {vm_name}")
return self.pods_status
except Exception as e:
logging.error(f"Error executing KubeVirt VM outage scenario: {e}")
log_exception(e)
return 1
def validate_environment(self, vm_name: str, namespace: str) -> bool:
"""
Validate that KubeVirt is installed and the specified VM exists.
:param vm_name: Name of the VM to validate
:param namespace: Namespace of the VM
:return: True if environment is valid, False otherwise
"""
try:
# Check if KubeVirt CRDs exist
crd_list = self.custom_object_client.list_namespaced_custom_object("kubevirt.io","v1",namespace,"virtualmachines")
kubevirt_crds = [crd for crd in crd_list.items() ]
if not kubevirt_crds:
logging.error("KubeVirt CRDs not found. Ensure KubeVirt/CNV is installed in the cluster")
return False
# Check if VMI exists
vmi = self.get_vmi(vm_name, namespace)
if not vmi:
logging.error(f"VMI {vm_name} not found in namespace {namespace}")
return False
logging.info(f"Validated environment: KubeVirt is installed and VMI {vm_name} exists")
return True
except Exception as e:
logging.error(f"Error validating environment: {e}")
return False
def patch_vm_spec(self, vm_name: str, namespace: str, running: bool) -> bool:
"""
Patch the VM spec to enable/disable auto-restart.
:param vm_name: Name of the VM to patch
:param namespace: Namespace of the VM
:param running: Whether the VM should be set to running state
:return: True if patch was successful, False otherwise
"""
try:
# Get the VM object first to get its current spec
vm = self.custom_object_client.get_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachines",
name=vm_name
)
# Update the running state
if 'spec' not in vm:
vm['spec'] = {}
vm['spec']['running'] = running
# Apply the patch
self.custom_object_client.patch_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachines",
name=vm_name,
body=vm
)
return True
except ApiException as e:
logging.error(f"Failed to patch VM {vm_name}: {e}")
return False
except Exception as e:
logging.error(f"Unexpected error patching VM {vm_name}: {e}")
return False
def delete_vmi(self, vm_name: str, namespace: str, disable_auto_restart: bool = False, timeout: int = 120) -> int:
"""
Delete a Virtual Machine Instance to simulate a VM outage.
:param vm_name: Name of the VMI to delete
:param namespace: Namespace of the VMI
:return: 0 for success, 1 for failure
"""
try:
logging.info(f"Injecting chaos: Deleting VMI {vm_name} in namespace {namespace}")
# If auto-restart should be disabled, patch the VM spec first
if disable_auto_restart:
logging.info(f"Disabling auto-restart for VM {vm_name} by setting spec.running=False")
if not self.patch_vm_spec(vm_name, namespace, running=False):
logging.error("Failed to disable auto-restart for VM"
" - proceeding with deletion but VM may auto-restart")
start_creation_time = self.original_vmi.get('metadata', {}).get('creationTimestamp')
start_time = time.time()
try:
self.custom_object_client.delete_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
name=vm_name
)
except ApiException as e:
if e.status == 404:
logging.warning(f"VMI {vm_name} not found during deletion")
return 1
else:
logging.error(f"API error during VMI deletion: {e}")
return 1
# Wait for the VMI to be deleted
while time.time() - start_time < timeout:
deleted_vmi = self.get_vmi(vm_name, namespace)
if deleted_vmi:
if start_creation_time != deleted_vmi.get('metadata', {}).get('creationTimestamp'):
logging.info(f"VMI {vm_name} successfully recreated")
self.affected_pod.pod_rescheduling_time = time.time() - start_time
return 0
else:
logging.info(f"VMI {vm_name} successfully deleted")
time.sleep(1)
logging.error(f"Timed out waiting for VMI {vm_name} to be deleted")
self.pods_status.unrecovered = self.affected_pod
return 1
except Exception as e:
logging.error(f"Error deleting VMI {vm_name}: {e}")
log_exception(e)
self.pods_status.unrecovered = self.affected_pod
return 1
def wait_for_running(self, vm_name: str, namespace: str, timeout: int = 120) -> int:
start_time = time.time()
while time.time() - start_time < timeout:
# Check current state once since we've already waited for the duration
vmi = self.get_vmi(vm_name, namespace)
if vmi:
if vmi.get('status', {}).get('phase') == "Running":
end_time = time.time()
self.affected_pod.pod_readiness_time = end_time - start_time
logging.info(f"VMI {vm_name} is already running")
return 0
logging.info(f"VMI {vm_name} exists but is not in Running state. Current state: {vmi.get('status', {}).get('phase')}")
else:
logging.info(f"VMI {vm_name} not yet recreated")
time.sleep(1)
return 1
def recover(self, vm_name: str, namespace: str, disable_auto_restart: bool = False) -> int:
"""
Recover a deleted VMI, either by waiting for auto-recovery or manually recreating it.
:param vm_name: Name of the VMI to recover
:param namespace: Namespace of the VMI
:param disable_auto_restart: Whether auto-restart was disabled during injection
:return: 0 for success, 1 for failure
"""
try:
logging.info(f"Attempting to recover VMI {vm_name} in namespace {namespace}")
if self.original_vmi:
logging.info(f"Auto-recovery didn't occur for VMI {vm_name}. Attempting manual recreation")
try:
# Clean up server-generated fields
vmi_dict = self.original_vmi.copy()
if 'metadata' in vmi_dict:
metadata = vmi_dict['metadata']
for field in ['resourceVersion', 'uid', 'creationTimestamp', 'generation']:
if field in metadata:
del metadata[field]
# Create the VMI
self.custom_object_client.create_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
body=vmi_dict
)
logging.info(f"Successfully recreated VMI {vm_name}")
# Wait for VMI to start running
self.wait_for_running(vm_name,namespace)
logging.warning(f"VMI {vm_name} was recreated but didn't reach Running state in time")
return 0 # Still consider it a success as the VMI was recreated
except Exception as e:
logging.error(f"Error recreating VMI {vm_name}: {e}")
log_exception(e)
return 1
else:
logging.error(f"Failed to recover VMI {vm_name}: No original state captured and auto-recovery did not occur")
return 1
except Exception as e:
logging.error(f"Unexpected error recovering VMI {vm_name}: {e}")
log_exception(e)
return 1

View File

@@ -9,7 +9,7 @@ spec:
hostNetwork: true
containers:
- name: networkchaos
image: docker.io/fedora/tools
image: quay.io/krkn-chaos/krkn:tools
command: ["/bin/sh", "-c", "{{cmd}}"]
securityContext:
privileged: true
@@ -22,4 +22,4 @@ spec:
hostPath:
path: /lib/modules
restartPolicy: Never
backoffLimit: 0
backoffLimit: 0

View File

@@ -7,7 +7,7 @@ spec:
nodeName: {{nodename}}
containers:
- name: fedtools
image: docker.io/fedora/tools
image: quay.io/krkn-chaos/krkn:tools
command:
- /bin/sh
- -c

View File

@@ -6,7 +6,7 @@ spec:
nodeName: {{nodename}}
containers:
- name: modtools
image: docker.io/fedora/tools
image: quay.io/krkn-chaos/krkn:tools
imagePullPolicy: IfNotPresent
command:
- /bin/sh
@@ -27,4 +27,4 @@ spec:
hostNetwork: true
hostIPC: true
hostPID: true
restartPolicy: Never
restartPolicy: Never

View File

@@ -9,7 +9,7 @@ spec:
hostNetwork: true
containers:
- name: networkchaos
image: docker.io/fedora/tools
image: quay.io/krkn-chaos/krkn:tools
command: ["chroot", "/host", "/bin/sh", "-c", "{{cmd}}"]
securityContext:
privileged: true

View File

@@ -6,7 +6,7 @@ spec:
nodeName: {{nodename}}
containers:
- name: modtools
image: docker.io/fedora/tools
image: quay.io/krkn-chaos/krkn:tools
imagePullPolicy: IfNotPresent
command:
- /bin/sh
@@ -27,4 +27,4 @@ spec:
hostNetwork: true
hostIPC: true
hostPID: true
restartPolicy: Never
restartPolicy: Never

View File

@@ -9,7 +9,7 @@ spec:
hostNetwork: true
containers:
- name: networkchaos
image: docker.io/fedora/tools
image: quay.io/krkn-chaos/krkn:tools
command: ["/bin/sh", "-c", "{{cmd}}"]
securityContext:
privileged: true

View File

@@ -7,7 +7,7 @@ spec:
nodeName: {{nodename}}
containers:
- name: fedtools
image: docker.io/fedora/tools
image: quay.io/krkn-chaos/krkn:tools
command:
- /bin/sh
- -c

View File

@@ -14,9 +14,11 @@ class BaseNetworkChaosConfig:
wait_duration: int
test_duration: int
label_selector: str
service_account: str
instance_count: int
execution: str
namespace: str
taints: list[str]
def validate(self) -> list[str]:
errors = []

View File

@@ -4,16 +4,26 @@ metadata:
name: {{pod_name}}
namespace: {{namespace}}
spec:
{% if service_account %}
serviceAccountName: {{ service_account }}
{%endif%}
{% if host_network %}
hostNetwork: true
{%endif%}
{% if taints %}
tolerations:
{% for toleration in taints %}
- key: "{{ toleration.key }}"
operator: "{{ toleration.operator }}"
{% if toleration.value %}
value: "{{ toleration.value }}"
{% endif %}
effect: "{{ toleration.effect }}"
{% endfor %}
{% endif %}
hostPID: true
nodeSelector:
kubernetes.io/hostname: {{target}}
tolerations:
- key: "node-role.kubernetes.io/master"
operator: "Exists"
effect: "NoSchedule"
containers:
- name: {{container_name}}
imagePullPolicy: Always

View File

@@ -58,6 +58,27 @@ def deploy_network_filter_pod(
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
env = Environment(loader=file_loader, autoescape=True)
pod_template = env.get_template("templates/network-chaos.j2")
tolerations = []
for taint in config.taints:
key_value_part, effect = taint.split(":", 1)
if "=" in key_value_part:
key, value = key_value_part.split("=", 1)
operator = "Equal"
else:
key = key_value_part
value = None
operator = "Exists"
toleration = {
"key": key,
"operator": operator,
"effect": effect,
}
if value is not None:
toleration["value"] = value
tolerations.append(toleration)
pod_body = yaml.safe_load(
pod_template.render(
pod_name=pod_name,
@@ -66,6 +87,8 @@ def deploy_network_filter_pod(
target=target_node,
container_name=container_name,
workload_image=config.image,
taints=tolerations,
service_account=config.service_account
)
)

View File

@@ -274,7 +274,7 @@ done'''
logging.info("Disk response: %s" % (disk_response))
node_disks = [disk for disk in disk_response.split("\n") if disk]
logging.info("Node disks: %s" % (node_disks))
offline_disks = [disk for disk in node_disks if disk not in user_disks]
offline_disks = [disk for disk in user_disks if disk in node_disks]
return offline_disks if offline_disks else node_disks
except Exception as e:
logging.error(

View File

@@ -1,16 +1,10 @@
import datetime
import time
import random
import logging
import paramiko
from krkn_lib.models.k8s import AffectedNode
import krkn.invoke.command as runcommand
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus
from krkn_lib.models.k8s import AffectedNode
node_general = False
def get_node_by_name(node_name_list, kubecli: KrknKubernetes):
killable_nodes = kubecli.list_killable_nodes()
@@ -65,14 +59,6 @@ def wait_for_unknown_status(node, timeout, kubecli: KrknKubernetes, affected_nod
return affected_node
# Get the ip of the cluster node
def get_node_ip(node):
return runcommand.invoke(
"kubectl get node %s -o "
"jsonpath='{.status.addresses[?(@.type==\"InternalIP\")].address}'" % (node)
)
def check_service_status(node, service, ssh_private_key, timeout):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

View File

@@ -36,10 +36,25 @@ class IbmCloud:
self.service = VpcV1(authenticator=authenticator)
self.service.set_service_url(service_url)
except Exception as e:
logging.error("error authenticating" + str(e))
def configure_ssl_verification(self, disable_ssl_verification):
"""
Configure SSL verification for IBM Cloud VPC service.
Args:
disable_ssl_verification: If True, disables SSL verification.
"""
logging.info(f"Configuring SSL verification: disable_ssl_verification={disable_ssl_verification}")
if disable_ssl_verification:
self.service.set_disable_ssl_verification(True)
logging.info("SSL verification disabled for IBM Cloud VPC service")
else:
self.service.set_disable_ssl_verification(False)
logging.info("SSL verification enabled for IBM Cloud VPC service")
# Get the instance ID of the node
def get_instance_id(self, node_name):
node_list = self.list_instances()
@@ -260,9 +275,13 @@ class IbmCloud:
@dataclass
class ibm_node_scenarios(abstract_node_scenarios):
def __init__(self, kubecli: KrknKubernetes, node_action_kube_check: bool, affected_nodes_status: AffectedNodeStatus):
def __init__(self, kubecli: KrknKubernetes, node_action_kube_check: bool, affected_nodes_status: AffectedNodeStatus, disable_ssl_verification: bool):
super().__init__(kubecli, node_action_kube_check, affected_nodes_status)
self.ibmcloud = IbmCloud()
# Configure SSL verification
self.ibmcloud.configure_ssl_verification(disable_ssl_verification)
self.node_action_kube_check = node_action_kube_check
def node_start_scenario(self, instance_kill_count, node, timeout):
@@ -327,7 +346,7 @@ class ibm_node_scenarios(abstract_node_scenarios):
logging.info("Starting node_reboot_scenario injection")
logging.info("Rebooting the node %s " % (node))
self.ibmcloud.reboot_instances(instance_id)
self.ibmcloud.wait_until_rebooted(instance_id, timeout)
self.ibmcloud.wait_until_rebooted(instance_id, timeout, affected_node)
if self.node_action_kube_check:
nodeaction.wait_for_unknown_status(
node, timeout, affected_node

View File

@@ -120,7 +120,8 @@ class NodeActionsScenarioPlugin(AbstractScenarioPlugin):
node_scenario["cloud_type"].lower() == "ibm"
or node_scenario["cloud_type"].lower() == "ibmcloud"
):
return ibm_node_scenarios(kubecli, node_action_kube_check, affected_nodes_status)
disable_ssl_verification = get_yaml_item_value(node_scenario, "disable_ssl_verification", True)
return ibm_node_scenarios(kubecli, node_action_kube_check, affected_nodes_status, disable_ssl_verification)
else:
logging.error(
"Cloud type "

View File

@@ -38,3 +38,4 @@ zope.interface==5.4.0
git+https://github.com/vmware/vsphere-automation-sdk-python.git@v8.0.0.0
cryptography>=42.0.4 # not directly required, pinned by Snyk to avoid a vulnerability
protobuf>=4.25.8 # not directly required, pinned by Snyk to avoid a vulnerability

View File

@@ -5,6 +5,7 @@ image: quay.io/krkn-chaos/krkn-hog
namespace: default
cpu-load-percentage: 90
cpu-method: all
# node-name: "worker-0" # Uncomment to target a specific node by name
node-selector: "node-role.kubernetes.io/worker="
number-of-nodes: 2
taints: [] #example ["node-role.kubernetes.io/master:NoSchedule"]

View File

@@ -6,10 +6,11 @@ namespace: default
io-block-size: 1m
io-write-bytes: 1g
io-target-pod-folder: /hog-data
# node-name: "worker-0" # Uncomment to target a specific node by name
io-target-pod-volume:
name: node-volume
hostPath:
path: /root # a path writable by kubelet in the root filesystem of the node
node-selector: "node-role.kubernetes.io/worker="
number-of-nodes: ''
taints: [] #example ["node-role.kubernetes.io/master:NoSchedule"]
taints: [] #example ["node-role.kubernetes.io/master:NoSchedule"]

View File

@@ -4,6 +4,7 @@ hog-type: memory
image: quay.io/krkn-chaos/krkn-hog
namespace: default
memory-vm-bytes: 90%
# node-name: "worker-0" # Uncomment to target a specific node by name
node-selector: "node-role.kubernetes.io/worker="
number-of-nodes: ''
taints: [] #example ["node-role.kubernetes.io/master:NoSchedule"]

View File

@@ -1,14 +1,18 @@
- id: node_network_filter
image: "quay.io/krkn-chaos/krkn-network-chaos:latest"
wait_duration: 300
test_duration: 100
label_selector: "kubernetes.io/hostname=minikube"
wait_duration: 1
test_duration: 10
label_selector: "<node_selector>"
service_account: ""
taints: [] # example ["node-role.kubernetes.io/master:NoSchedule"]
namespace: 'default'
instance_count: 1
execution: parallel
ingress: false
egress: true
target: ''
target: '<node_name>'
interfaces: []
ports:
- 53
- 2309
protocols:
- tcp

View File

@@ -2,13 +2,15 @@
image: "quay.io/krkn-chaos/krkn-network-chaos:latest"
wait_duration: 1
test_duration: 60
label_selector: "app=network-attacked"
label_selector: "<pod_selector>"
service_account: ""
taints: [] # example ["node-role.kubernetes.io/master:NoSchedule"]
namespace: 'default'
instance_count: 1
execution: parallel
ingress: false
egress: true
target: ""
target: "<pod_name>"
interfaces: []
protocols:
- tcp

View File

@@ -0,0 +1,7 @@
scenarios:
- name: "kubevirt outage test"
scenario: kubevirt_vm_outage
parameters:
vm_name: <vm-name>
namespace: <namespace>
timeout: 60

View File

@@ -7,10 +7,12 @@ node_scenarios:
timeout: 360
duration: 120
cloud_type: ibm
disable_ssl_verification: true # Set to true for CI environments with certificate issues
- actions:
- node_reboot_scenario
node_name:
label_selector: node-role.kubernetes.io/worker
instance_count: 1
timeout: 120
cloud_type: ibm
cloud_type: ibm
disable_ssl_verification: true # Set to true for CI environments with certificate issues

View File

@@ -0,0 +1,215 @@
import unittest
import time
from unittest.mock import MagicMock, patch
import yaml
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.kubevirt_vm_outage.kubevirt_vm_outage_scenario_plugin import KubevirtVmOutageScenarioPlugin
class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for KubevirtVmOutageScenarioPlugin
"""
self.plugin = KubevirtVmOutageScenarioPlugin()
# Create mock k8s client
self.k8s_client = MagicMock()
self.custom_object_client = MagicMock()
self.k8s_client.custom_object_client = self.custom_object_client
self.plugin.k8s_client = self.k8s_client
# Mock methods needed for KubeVirt operations
self.k8s_client.list_custom_resource_definition = MagicMock()
# Mock custom resource definition list with KubeVirt CRDs
crd_list = MagicMock()
crd_item = MagicMock()
crd_item.spec = MagicMock()
crd_item.spec.group = "kubevirt.io"
crd_list.items = [crd_item]
self.k8s_client.list_custom_resource_definition.return_value = crd_list
# Mock VMI data
self.mock_vmi = {
"metadata": {
"name": "test-vm",
"namespace": "default"
},
"status": {
"phase": "Running"
}
}
# Create test config
self.config = {
"scenarios": [
{
"name": "kubevirt outage test",
"scenario": "kubevirt_vm_outage",
"parameters": {
"vm_name": "test-vm",
"namespace": "default",
"duration": 0
}
}
]
}
# Create a temporary config file
import tempfile, os
temp_dir = tempfile.gettempdir()
self.scenario_file = os.path.join(temp_dir, "test_kubevirt_scenario.yaml")
with open(self.scenario_file, "w") as f:
yaml.dump(self.config, f)
# Mock dependencies
self.telemetry = MagicMock(spec=KrknTelemetryOpenshift)
self.scenario_telemetry = MagicMock(spec=ScenarioTelemetry)
self.telemetry.get_lib_kubernetes.return_value = self.k8s_client
def test_successful_injection_and_recovery(self):
"""
Test successful deletion and recovery of a VMI
"""
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock inject and recover to simulate success
with patch.object(self.plugin, 'inject', return_value=0) as mock_inject:
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
mock_inject.assert_called_once_with("test-vm", "default", False)
mock_recover.assert_called_once_with("test-vm", "default", False)
def test_injection_failure(self):
"""
Test failure during VMI deletion
"""
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock inject to simulate failure
with patch.object(self.plugin, 'inject', return_value=1) as mock_inject:
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 1)
mock_inject.assert_called_once_with("test-vm", "default", False)
mock_recover.assert_not_called()
def test_disable_auto_restart(self):
"""
Test VM auto-restart can be disabled
"""
# Configure test with disable_auto_restart=True
self.config["scenarios"][0]["parameters"]["disable_auto_restart"] = True
# Mock VM object for patching
mock_vm = {
"metadata": {"name": "test-vm", "namespace": "default"},
"spec": {}
}
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock VM patch operation
with patch.object(self.plugin, 'patch_vm_spec') as mock_patch_vm:
mock_patch_vm.return_value = True
# Mock inject and recover
with patch.object(self.plugin, 'inject', return_value=0) as mock_inject:
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
# Should call patch_vm_spec to disable auto-restart
mock_patch_vm.assert_any_call("test-vm", "default", False)
# Should call patch_vm_spec to re-enable auto-restart during recovery
mock_patch_vm.assert_any_call("test-vm", "default", True)
mock_inject.assert_called_once_with("test-vm", "default", True)
mock_recover.assert_called_once_with("test-vm", "default", True)
def test_recovery_when_vmi_does_not_exist(self):
"""
Test recovery logic when VMI does not exist after deletion
"""
# Store the original VMI in the plugin for recovery
self.plugin.original_vmi = self.mock_vmi.copy()
# Create a cleaned vmi_dict as the plugin would
vmi_dict = self.mock_vmi.copy()
# Set up running VMI data for after recovery
running_vmi = {
"metadata": {"name": "test-vm", "namespace": "default"},
"status": {"phase": "Running"}
}
# Set up time.time to immediately exceed the timeout for auto-recovery
with patch('time.time', side_effect=[0, 301, 301, 301, 301, 310, 320]):
# Mock get_vmi to always return None (not auto-recovered)
with patch.object(self.plugin, 'get_vmi', side_effect=[None, None, running_vmi]):
# Mock the custom object API to return success
self.custom_object_client.create_namespaced_custom_object = MagicMock(return_value=running_vmi)
# Run recovery with mocked time.sleep
with patch('time.sleep'):
result = self.plugin.recover("test-vm", "default", False)
self.assertEqual(result, 0)
# Verify create was called with the right arguments for our API version and kind
self.custom_object_client.create_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
body=vmi_dict
)
def test_validation_failure(self):
"""
Test validation failure when KubeVirt is not installed
"""
# Mock empty CRD list (no KubeVirt CRDs)
empty_crd_list = MagicMock()
empty_crd_list.items = []
self.k8s_client.list_custom_resource_definition.return_value = empty_crd_list
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 1)
def test_delete_vmi_timeout(self):
"""
Test timeout during VMI deletion
"""
# Mock successful delete operation
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
# Mock that get_vmi always returns VMI (never gets deleted)
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Simulate timeout by making time.time return values that exceed the timeout
with patch('time.sleep'), patch('time.time', side_effect=[0, 10, 20, 130, 130, 130, 130, 140]):
result = self.plugin.inject("test-vm", "default", False)
self.assertEqual(result, 1)
self.custom_object_client.delete_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
name="test-vm"
)
if __name__ == "__main__":
unittest.main()