Compare commits

...

8 Commits

Author SHA1 Message Date
Paige Patton
0e5c8c55a4 adding details of node for hog failure
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 8m23s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-10 16:49:28 -04:00
Tullio Sebastiani
9d9a6f9b80 added missing parameters to node-network-filter + added default values
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-07-10 13:22:50 -04:00
Anshuman Panda
f8fe2ae5b7 Refactor: to use krkn-lib for getting and remove invoke funct. usage node IP
Signed-off-by: Anshuman Panda <ichuk0078@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-10 13:21:10 -04:00
Paige Patton
77b1dd32c7 adding kubevirt with pod timing
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-10 13:19:37 -04:00
Anshuman Panda
9df727ccf5 Ensure metrics are always saved with improved local fallback
Signed-off-by: Anshuman Panda <ichuk0078@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-10 13:19:07 -04:00
Tullio Sebastiani
70c8fec705 added pod-network-filter funtest (#863)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m37s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* added pod-network-filter funtest

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* updated kind settings

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-07-10 09:35:59 +02:00
Abhinav Sharma
0731144a6b Add support for triggering kubevirt VM outages (#816)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 10m2s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
* add requirement for kubevirt_vm_outage

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* add initial init and kubevirt_plugin files

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* add scenario in  kubevirt-vm-outage.yaml

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* implement init, get_scenario_types, run and placeholder for inject and recover functions

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* implement init client, execute_scenario, validate environment, inject and get_VMinstance fucntions

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* implement recover function

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* implement recover function

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* add test for kubevirt_vm_outage feature

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* improve KubeVirt recovery logic and update dependencies, for kubevirt

Signed-off-by: Paige Patton <prubenda@redhat.com>

* refactor(kubevirt): use KrknKubernetes client for KubeVirt operations

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* chore: Add auto-restart disable option and simplify code

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* chore: remove kubevirt external package used.

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>

* adding few changes and scenario in config file

Signed-off-by: Paige Patton <prubenda@redhat.com>

* removing docs

Signed-off-by: Paige Patton <prubenda@redhat.com>

* no affected pods

Signed-off-by: Paige Patton <prubenda@redhat.com>

---------

Signed-off-by: Abhinav Sharma <abhinavs1920bpl@gmail.com>
Signed-off-by: Paige Patton <prubenda@redhat.com>
Co-authored-by: Paige Patton <prubenda@redhat.com>
2025-07-08 14:04:57 -04:00
yogananth subramanian
9337052e7b Fix bm_node_scenarios.py
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 10m29s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Fix the logic in disk disruption scenario, which was returning the right set of disks to be off-lined.

Signed-off-by: Yogananth Subramanian <ysubrama@redhat.com>
2025-07-07 13:49:33 -04:00
17 changed files with 751 additions and 48 deletions

View File

@@ -21,27 +21,7 @@ jobs:
helm repo add stable https://charts.helm.sh/stable
helm repo update
- name: Deploy prometheus & Port Forwarding
run: |
kubectl create namespace prometheus-k8s
helm install \
--wait --timeout 360s \
kind-prometheus \
prometheus-community/kube-prometheus-stack \
--namespace prometheus-k8s \
--set prometheus.service.nodePort=30000 \
--set prometheus.service.type=NodePort \
--set grafana.service.nodePort=31000 \
--set grafana.service.type=NodePort \
--set alertmanager.service.nodePort=32000 \
--set alertmanager.service.type=NodePort \
--set prometheus-node-exporter.service.nodePort=32001 \
--set prometheus-node-exporter.service.type=NodePort \
--set prometheus.prometheusSpec.maximumStartupDurationSeconds=300
SELECTOR=`kubectl -n prometheus-k8s get service kind-prometheus-kube-prome-prometheus -o wide --no-headers=true | awk '{ print $7 }'`
POD_NAME=`kubectl -n prometheus-k8s get pods --selector="$SELECTOR" --no-headers=true | awk '{ print $1 }'`
kubectl -n prometheus-k8s port-forward $POD_NAME 9090:9090 &
sleep 5
uses: redhat-chaos/actions/prometheus@main
- name: Install Python
uses: actions/setup-python@v4
with:
@@ -89,6 +69,7 @@ jobs:
echo "test_cpu_hog" >> ./CI/tests/functional_tests
echo "test_memory_hog" >> ./CI/tests/functional_tests
echo "test_io_hog" >> ./CI/tests/functional_tests
echo "test_pod_network_filter" >> ./CI/tests/functional_tests
# Push on main only steps + all other functional to collect coverage
@@ -119,6 +100,7 @@ jobs:
echo "test_cpu_hog" >> ./CI/tests/functional_tests
echo "test_memory_hog" >> ./CI/tests/functional_tests
echo "test_io_hog" >> ./CI/tests/functional_tests
echo "test_pod_network_filter" >> ./CI/tests/functional_tests
# Final common steps
- name: Run Functional tests

View File

@@ -0,0 +1,29 @@
apiVersion: v1
kind: Pod
metadata:
name: pod-network-filter-test
labels:
app.kubernetes.io/name: pod-network-filter
spec:
containers:
- name: nginx
image: quay.io/krkn-chaos/krkn-funtests:pod-network-filter
ports:
- containerPort: 5000
name: pod-network-prt
---
apiVersion: v1
kind: Service
metadata:
name: pod-network-filter-service
spec:
selector:
app.kubernetes.io/name: pod-network-filter
type: NodePort
ports:
- name: pod-network-filter-svc
protocol: TCP
port: 80
targetPort: pod-network-prt
nodePort: 30037

View File

@@ -7,7 +7,7 @@ trap finish EXIT
function functional_test_cpu_hog {
yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/cpu-hog.yml
yq -i '."node-selector"="kubernetes.io/hostname=kind-worker2"' scenarios/kube/cpu-hog.yml
export scenario_type="hog_scenarios"
export scenario_file="scenarios/kube/cpu-hog.yml"

View File

@@ -5,12 +5,13 @@ source CI/tests/common.sh
trap error ERR
trap finish EXIT
function functional_test_io_hog {
yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/io-hog.yml
yq -i '."node-selector"="kubernetes.io/hostname=kind-worker2"' scenarios/kube/io-hog.yml
export scenario_type="hog_scenarios"
export scenario_file="scenarios/kube/io-hog.yml"
export post_config=""
cat $scenario_file
envsubst < CI/config/common_test_config.yaml > CI/config/io_hog.yaml
python3 -m coverage run -a run_kraken.py -c CI/config/io_hog.yaml
echo "IO Hog: Success"

View File

@@ -7,7 +7,7 @@ trap finish EXIT
function functional_test_memory_hog {
yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/memory-hog.yml
yq -i '."node-selector"="kubernetes.io/hostname=kind-worker2"' scenarios/kube/memory-hog.yml
export scenario_type="hog_scenarios"
export scenario_file="scenarios/kube/memory-hog.yml"
export post_config=""

View File

@@ -0,0 +1,59 @@
function functional_pod_network_filter {
export SERVICE_URL="http://localhost:8889"
export scenario_type="network_chaos_ng_scenarios"
export scenario_file="scenarios/kube/pod-network-filter.yml"
export post_config=""
envsubst < CI/config/common_test_config.yaml > CI/config/pod_network_filter.yaml
yq -i '.[0].test_duration=10' scenarios/kube/pod-network-filter.yml
yq -i '.[0].label_selector=""' scenarios/kube/pod-network-filter.yml
yq -i '.[0].ingress=false' scenarios/kube/pod-network-filter.yml
yq -i '.[0].egress=true' scenarios/kube/pod-network-filter.yml
yq -i '.[0].target="pod-network-filter-test"' scenarios/kube/pod-network-filter.yml
yq -i '.[0].protocols=["tcp"]' scenarios/kube/pod-network-filter.yml
yq -i '.[0].ports=[443]' scenarios/kube/pod-network-filter.yml
## Test webservice deployment
kubectl apply -f ./CI/templates/pod_network_filter.yaml
COUNTER=0
while true
do
curl $SERVICE_URL
EXITSTATUS=$?
if [ "$EXITSTATUS" -eq "0" ]
then
break
fi
sleep 1
COUNTER=$((COUNTER+1))
[ $COUNTER -eq "100" ] && echo "maximum number of retry reached, test failed" && exit 1
done
python3 -m coverage run -a run_kraken.py -c CI/config/pod_network_filter.yaml > /dev/null 2>&1 &
PID=$!
# wait until the dns resolution starts failing and the service returns 400
DNS_FAILURE_STATUS=0
while true
do
OUT_STATUS_CODE=$(curl -X GET -s -o /dev/null -I -w "%{http_code}" $SERVICE_URL)
if [ "$OUT_STATUS_CODE" -eq "404" ]
then
DNS_FAILURE_STATUS=404
fi
if [ "$DNS_FAILURE_STATUS" -eq "404" ] && [ "$OUT_STATUS_CODE" -eq "200" ]
then
echo "service restored"
break
fi
COUNTER=$((COUNTER+1))
[ $COUNTER -eq "100" ] && echo "maximum number of retry reached, test failed" && exit 1
sleep 2
done
wait $PID
}
functional_pod_network_filter

View File

@@ -47,6 +47,8 @@ kraken:
- scenarios/kube/syn_flood.yaml
- network_chaos_ng_scenarios:
- scenarios/kube/network-filter.yml
- kubevirt_vm_outage:
- scenarios/kubevirt/kubevirt-vm-outage.yaml
cerberus:
cerberus_enabled: False # Enable it when cerberus is previously installed

View File

@@ -5,6 +5,8 @@ nodes:
extraPortMappings:
- containerPort: 30036
hostPort: 8888
- containerPort: 30037
hostPort: 8889
- role: control-plane
- role: control-plane
- role: worker

View File

@@ -9,6 +9,7 @@ import logging
import urllib3
import sys
import json
import tempfile
import yaml
from krkn_lib.elastic.krkn_elastic import KrknElastic
@@ -251,11 +252,29 @@ def metrics(
metric[k] = v
metric['timestamp'] = str(datetime.datetime.now())
metrics_list.append(metric.copy())
if elastic:
save_metrics = False
if elastic is not None and elastic_metrics_index is not None:
result = elastic.upload_metrics_to_elasticsearch(
run_uuid=run_uuid, index=elastic_metrics_index, raw_data=metrics_list
)
if result == -1:
logging.error("failed to save metrics on ElasticSearch")
save_metrics = True
else:
save_metrics = True
if save_metrics:
local_dir = os.path.join(tempfile.gettempdir(), "krkn_metrics")
os.makedirs(local_dir, exist_ok=True)
local_file = os.path.join(local_dir, f"{elastic_metrics_index}_{run_uuid}.json")
try:
with open(local_file, "w") as f:
json.dump({
"run_uuid": run_uuid,
"metrics": metrics_list
}, f, indent=2)
logging.info(f"Metrics saved to {local_file}")
except Exception as e:
logging.error(f"Failed to save metrics to {local_file}: {e}")
return metrics_list

View File

@@ -0,0 +1,399 @@
import logging
import time
from typing import Dict, Any, Optional
import random
import re
import yaml
from kubernetes.client.rest import ApiException
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import log_exception
from krkn_lib.models.k8s import AffectedPod, PodsStatus
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
"""
A scenario plugin that injects chaos by deleting a KubeVirt Virtual Machine Instance (VMI).
This plugin simulates a VM crash or outage scenario and supports automated or manual recovery.
"""
def __init__(self):
self.k8s_client = None
self.original_vmi = None
# Scenario type is handled directly in execute_scenario
def get_scenario_types(self) -> list[str]:
return ["kubevirt_vm_outage"]
def run(
self,
run_uuid: str,
scenario: str,
krkn_config: dict[str, any],
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
) -> int:
"""
Main entry point for the plugin.
Parses the scenario configuration and executes the chaos scenario.
"""
try:
with open(scenario, "r") as f:
scenario_config = yaml.full_load(f)
self.init_clients(lib_telemetry.get_lib_kubernetes())
pods_status = PodsStatus()
for config in scenario_config["scenarios"]:
if config.get("scenario") == "kubevirt_vm_outage":
single_pods_status = self.execute_scenario(config, scenario_telemetry)
pods_status.merge(single_pods_status)
scenario_telemetry.affected_pods = pods_status
return 0
except Exception as e:
logging.error(f"KubeVirt VM Outage scenario failed: {e}")
log_exception(e)
return 1
def init_clients(self, k8s_client: KrknKubernetes):
"""
Initialize Kubernetes client for KubeVirt operations.
"""
self.k8s_client = k8s_client
self.custom_object_client = k8s_client.custom_object_client
logging.info("Successfully initialized Kubernetes client for KubeVirt operations")
def get_vmi(self, name: str, namespace: str) -> Optional[Dict]:
"""
Get a Virtual Machine Instance by name and namespace.
:param name: Name of the VMI to retrieve
:param namespace: Namespace of the VMI
:return: The VMI object if found, None otherwise
"""
try:
vmi = self.custom_object_client.get_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
name=name
)
return vmi
except ApiException as e:
if e.status == 404:
logging.warning(f"VMI {name} not found in namespace {namespace}")
return None
else:
logging.error(f"Error getting VMI {name}: {e}")
raise
except Exception as e:
logging.error(f"Unexpected error getting VMI {name}: {e}")
raise
def get_vmis(self, regex_name: str, namespace: str) -> Optional[Dict]:
"""
Get a Virtual Machine Instance by name and namespace.
:param name: Name of the VMI to retrieve
:param namespace: Namespace of the VMI
:return: The VMI object if found, None otherwise
"""
try:
vmis = self.custom_object_client.list_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
)
vmi_list = []
for vmi in vmis.get("items"):
vmi_name = vmi.get("metadata",{}).get("name")
match = re.match(regex_name, vmi_name)
if match:
vmi_list.append(vmi)
return vmi_list
except ApiException as e:
if e.status == 404:
logging.warning(f"VMI {regex_name} not found in namespace {namespace}")
return None
else:
logging.error(f"Error getting VMI {regex_name}: {e}")
raise
except Exception as e:
logging.error(f"Unexpected error getting VMI {regex_name}: {e}")
raise
def execute_scenario(self, config: Dict[str, Any], scenario_telemetry: ScenarioTelemetry) -> int:
"""
Execute a KubeVirt VM outage scenario based on the provided configuration.
:param config: The scenario configuration
:param scenario_telemetry: The telemetry object for recording metrics
:return: 0 for success, 1 for failure
"""
try:
params = config.get("parameters", {})
vm_name = params.get("vm_name")
namespace = params.get("namespace", "default")
timeout = params.get("timeout", 60)
kill_count = params.get("kill_count", 1)
disable_auto_restart = params.get("disable_auto_restart", False)
self.pods_status = PodsStatus()
if not vm_name:
logging.error("vm_name parameter is required")
return 1
vmis_list = self.get_vmis(vm_name,namespace)
rand_int = random.randint(0, len(vmis_list) - 1)
vmi = vmis_list[rand_int]
logging.info(f"Starting KubeVirt VM outage scenario for VM: {vm_name} in namespace: {namespace}")
vmi_name = vmi.get("metadata").get("name")
if not self.validate_environment(vmi_name, namespace):
return 1
vmi = self.get_vmi(vmi_name, namespace)
self.affected_pod = AffectedPod(
pod_name=vmi_name,
namespace=namespace,
)
if not vmi:
logging.error(f"VMI {vm_name} not found in namespace {namespace}")
return 1
self.original_vmi = vmi
logging.info(f"Captured initial state of VMI: {vm_name}")
result = self.delete_vmi(vmi_name, namespace, disable_auto_restart)
if result != 0:
return self.pods_status
result = self.wait_for_running(vmi_name,namespace, timeout)
if result != 0:
self.recover(vmi_name, namespace)
self.pods_status.unrecovered = self.affected_pod
return self.pods_status
self.affected_pod.total_recovery_time = (
self.affected_pod.pod_readiness_time
+ self.affected_pod.pod_rescheduling_time
)
self.pods_status.recovered.append(self.affected_pod)
logging.info(f"Successfully completed KubeVirt VM outage scenario for VM: {vm_name}")
return self.pods_status
except Exception as e:
logging.error(f"Error executing KubeVirt VM outage scenario: {e}")
log_exception(e)
return 1
def validate_environment(self, vm_name: str, namespace: str) -> bool:
"""
Validate that KubeVirt is installed and the specified VM exists.
:param vm_name: Name of the VM to validate
:param namespace: Namespace of the VM
:return: True if environment is valid, False otherwise
"""
try:
# Check if KubeVirt CRDs exist
crd_list = self.custom_object_client.list_namespaced_custom_object("kubevirt.io","v1",namespace,"virtualmachines")
kubevirt_crds = [crd for crd in crd_list.items() ]
if not kubevirt_crds:
logging.error("KubeVirt CRDs not found. Ensure KubeVirt/CNV is installed in the cluster")
return False
# Check if VMI exists
vmi = self.get_vmi(vm_name, namespace)
if not vmi:
logging.error(f"VMI {vm_name} not found in namespace {namespace}")
return False
logging.info(f"Validated environment: KubeVirt is installed and VMI {vm_name} exists")
return True
except Exception as e:
logging.error(f"Error validating environment: {e}")
return False
def patch_vm_spec(self, vm_name: str, namespace: str, running: bool) -> bool:
"""
Patch the VM spec to enable/disable auto-restart.
:param vm_name: Name of the VM to patch
:param namespace: Namespace of the VM
:param running: Whether the VM should be set to running state
:return: True if patch was successful, False otherwise
"""
try:
# Get the VM object first to get its current spec
vm = self.custom_object_client.get_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachines",
name=vm_name
)
# Update the running state
if 'spec' not in vm:
vm['spec'] = {}
vm['spec']['running'] = running
# Apply the patch
self.custom_object_client.patch_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachines",
name=vm_name,
body=vm
)
return True
except ApiException as e:
logging.error(f"Failed to patch VM {vm_name}: {e}")
return False
except Exception as e:
logging.error(f"Unexpected error patching VM {vm_name}: {e}")
return False
def delete_vmi(self, vm_name: str, namespace: str, disable_auto_restart: bool = False, timeout: int = 120) -> int:
"""
Delete a Virtual Machine Instance to simulate a VM outage.
:param vm_name: Name of the VMI to delete
:param namespace: Namespace of the VMI
:return: 0 for success, 1 for failure
"""
try:
logging.info(f"Injecting chaos: Deleting VMI {vm_name} in namespace {namespace}")
# If auto-restart should be disabled, patch the VM spec first
if disable_auto_restart:
logging.info(f"Disabling auto-restart for VM {vm_name} by setting spec.running=False")
if not self.patch_vm_spec(vm_name, namespace, running=False):
logging.error("Failed to disable auto-restart for VM"
" - proceeding with deletion but VM may auto-restart")
start_creation_time = self.original_vmi.get('metadata', {}).get('creationTimestamp')
start_time = time.time()
try:
self.custom_object_client.delete_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
name=vm_name
)
except ApiException as e:
if e.status == 404:
logging.warning(f"VMI {vm_name} not found during deletion")
return 1
else:
logging.error(f"API error during VMI deletion: {e}")
return 1
# Wait for the VMI to be deleted
while time.time() - start_time < timeout:
deleted_vmi = self.get_vmi(vm_name, namespace)
if deleted_vmi:
if start_creation_time != deleted_vmi.get('metadata', {}).get('creationTimestamp'):
logging.info(f"VMI {vm_name} successfully recreated")
self.affected_pod.pod_rescheduling_time = time.time() - start_time
return 0
else:
logging.info(f"VMI {vm_name} successfully deleted")
time.sleep(1)
logging.error(f"Timed out waiting for VMI {vm_name} to be deleted")
self.pods_status.unrecovered = self.affected_pod
return 1
except Exception as e:
logging.error(f"Error deleting VMI {vm_name}: {e}")
log_exception(e)
self.pods_status.unrecovered = self.affected_pod
return 1
def wait_for_running(self, vm_name: str, namespace: str, timeout: int = 120) -> int:
start_time = time.time()
while time.time() - start_time < timeout:
# Check current state once since we've already waited for the duration
vmi = self.get_vmi(vm_name, namespace)
if vmi:
if vmi.get('status', {}).get('phase') == "Running":
end_time = time.time()
self.affected_pod.pod_readiness_time = end_time - start_time
logging.info(f"VMI {vm_name} is already running")
return 0
logging.info(f"VMI {vm_name} exists but is not in Running state. Current state: {vmi.get('status', {}).get('phase')}")
else:
logging.info(f"VMI {vm_name} not yet recreated")
time.sleep(1)
return 1
def recover(self, vm_name: str, namespace: str, disable_auto_restart: bool = False) -> int:
"""
Recover a deleted VMI, either by waiting for auto-recovery or manually recreating it.
:param vm_name: Name of the VMI to recover
:param namespace: Namespace of the VMI
:param disable_auto_restart: Whether auto-restart was disabled during injection
:return: 0 for success, 1 for failure
"""
try:
logging.info(f"Attempting to recover VMI {vm_name} in namespace {namespace}")
if self.original_vmi:
logging.info(f"Auto-recovery didn't occur for VMI {vm_name}. Attempting manual recreation")
try:
# Clean up server-generated fields
vmi_dict = self.original_vmi.copy()
if 'metadata' in vmi_dict:
metadata = vmi_dict['metadata']
for field in ['resourceVersion', 'uid', 'creationTimestamp', 'generation']:
if field in metadata:
del metadata[field]
# Create the VMI
self.custom_object_client.create_namespaced_custom_object(
group="kubevirt.io",
version="v1",
namespace=namespace,
plural="virtualmachineinstances",
body=vmi_dict
)
logging.info(f"Successfully recreated VMI {vm_name}")
# Wait for VMI to start running
self.wait_for_running(vm_name,namespace)
logging.warning(f"VMI {vm_name} was recreated but didn't reach Running state in time")
return 0 # Still consider it a success as the VMI was recreated
except Exception as e:
logging.error(f"Error recreating VMI {vm_name}: {e}")
log_exception(e)
return 1
else:
logging.error(f"Failed to recover VMI {vm_name}: No original state captured and auto-recovery did not occur")
return 1
except Exception as e:
logging.error(f"Unexpected error recovering VMI {vm_name}: {e}")
log_exception(e)
return 1

View File

@@ -274,7 +274,7 @@ done'''
logging.info("Disk response: %s" % (disk_response))
node_disks = [disk for disk in disk_response.split("\n") if disk]
logging.info("Node disks: %s" % (node_disks))
offline_disks = [disk for disk in node_disks if disk not in user_disks]
offline_disks = [disk for disk in user_disks if disk in node_disks]
return offline_disks if offline_disks else node_disks
except Exception as e:
logging.error(

View File

@@ -1,16 +1,10 @@
import datetime
import time
import random
import logging
import paramiko
from krkn_lib.models.k8s import AffectedNode
import krkn.invoke.command as runcommand
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus
from krkn_lib.models.k8s import AffectedNode
node_general = False
def get_node_by_name(node_name_list, kubecli: KrknKubernetes):
killable_nodes = kubecli.list_killable_nodes()
@@ -65,14 +59,6 @@ def wait_for_unknown_status(node, timeout, kubecli: KrknKubernetes, affected_nod
return affected_node
# Get the ip of the cluster node
def get_node_ip(node):
return runcommand.invoke(
"kubectl get node %s -o "
"jsonpath='{.status.addresses[?(@.type==\"InternalIP\")].address}'" % (node)
)
def check_service_status(node, service, ssh_private_key, timeout):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

View File

@@ -1,14 +1,16 @@
- id: node_network_filter
image: "quay.io/krkn-chaos/krkn-network-chaos:latest"
wait_duration: 300
test_duration: 100
label_selector: "kubernetes.io/hostname=minikube"
wait_duration: 1
test_duration: 10
label_selector: "<node_selector>"
namespace: 'default'
instance_count: 1
execution: parallel
ingress: false
egress: true
target: ''
target: '<node_name>'
interfaces: []
ports:
- 53
- 2309
protocols:
- tcp

View File

@@ -2,13 +2,13 @@
image: "quay.io/krkn-chaos/krkn-network-chaos:latest"
wait_duration: 1
test_duration: 60
label_selector: "app=network-attacked"
label_selector: "<pod_selector>"
namespace: 'default'
instance_count: 1
execution: parallel
ingress: false
egress: true
target: ""
target: "<pod_name>"
interfaces: []
protocols:
- tcp

View File

@@ -0,0 +1,7 @@
scenarios:
- name: "kubevirt outage test"
scenario: kubevirt_vm_outage
parameters:
vm_name: <vm-name>
namespace: <namespace>
timeout: 60

View File

@@ -0,0 +1,215 @@
import unittest
import time
from unittest.mock import MagicMock, patch
import yaml
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.kubevirt_vm_outage.kubevirt_vm_outage_scenario_plugin import KubevirtVmOutageScenarioPlugin
class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for KubevirtVmOutageScenarioPlugin
"""
self.plugin = KubevirtVmOutageScenarioPlugin()
# Create mock k8s client
self.k8s_client = MagicMock()
self.custom_object_client = MagicMock()
self.k8s_client.custom_object_client = self.custom_object_client
self.plugin.k8s_client = self.k8s_client
# Mock methods needed for KubeVirt operations
self.k8s_client.list_custom_resource_definition = MagicMock()
# Mock custom resource definition list with KubeVirt CRDs
crd_list = MagicMock()
crd_item = MagicMock()
crd_item.spec = MagicMock()
crd_item.spec.group = "kubevirt.io"
crd_list.items = [crd_item]
self.k8s_client.list_custom_resource_definition.return_value = crd_list
# Mock VMI data
self.mock_vmi = {
"metadata": {
"name": "test-vm",
"namespace": "default"
},
"status": {
"phase": "Running"
}
}
# Create test config
self.config = {
"scenarios": [
{
"name": "kubevirt outage test",
"scenario": "kubevirt_vm_outage",
"parameters": {
"vm_name": "test-vm",
"namespace": "default",
"duration": 0
}
}
]
}
# Create a temporary config file
import tempfile, os
temp_dir = tempfile.gettempdir()
self.scenario_file = os.path.join(temp_dir, "test_kubevirt_scenario.yaml")
with open(self.scenario_file, "w") as f:
yaml.dump(self.config, f)
# Mock dependencies
self.telemetry = MagicMock(spec=KrknTelemetryOpenshift)
self.scenario_telemetry = MagicMock(spec=ScenarioTelemetry)
self.telemetry.get_lib_kubernetes.return_value = self.k8s_client
def test_successful_injection_and_recovery(self):
"""
Test successful deletion and recovery of a VMI
"""
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock inject and recover to simulate success
with patch.object(self.plugin, 'inject', return_value=0) as mock_inject:
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
mock_inject.assert_called_once_with("test-vm", "default", False)
mock_recover.assert_called_once_with("test-vm", "default", False)
def test_injection_failure(self):
"""
Test failure during VMI deletion
"""
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock inject to simulate failure
with patch.object(self.plugin, 'inject', return_value=1) as mock_inject:
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 1)
mock_inject.assert_called_once_with("test-vm", "default", False)
mock_recover.assert_not_called()
def test_disable_auto_restart(self):
"""
Test VM auto-restart can be disabled
"""
# Configure test with disable_auto_restart=True
self.config["scenarios"][0]["parameters"]["disable_auto_restart"] = True
# Mock VM object for patching
mock_vm = {
"metadata": {"name": "test-vm", "namespace": "default"},
"spec": {}
}
# Mock get_vmi to return our mock VMI
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Mock VM patch operation
with patch.object(self.plugin, 'patch_vm_spec') as mock_patch_vm:
mock_patch_vm.return_value = True
# Mock inject and recover
with patch.object(self.plugin, 'inject', return_value=0) as mock_inject:
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 0)
# Should call patch_vm_spec to disable auto-restart
mock_patch_vm.assert_any_call("test-vm", "default", False)
# Should call patch_vm_spec to re-enable auto-restart during recovery
mock_patch_vm.assert_any_call("test-vm", "default", True)
mock_inject.assert_called_once_with("test-vm", "default", True)
mock_recover.assert_called_once_with("test-vm", "default", True)
def test_recovery_when_vmi_does_not_exist(self):
"""
Test recovery logic when VMI does not exist after deletion
"""
# Store the original VMI in the plugin for recovery
self.plugin.original_vmi = self.mock_vmi.copy()
# Create a cleaned vmi_dict as the plugin would
vmi_dict = self.mock_vmi.copy()
# Set up running VMI data for after recovery
running_vmi = {
"metadata": {"name": "test-vm", "namespace": "default"},
"status": {"phase": "Running"}
}
# Set up time.time to immediately exceed the timeout for auto-recovery
with patch('time.time', side_effect=[0, 301, 301, 301, 301, 310, 320]):
# Mock get_vmi to always return None (not auto-recovered)
with patch.object(self.plugin, 'get_vmi', side_effect=[None, None, running_vmi]):
# Mock the custom object API to return success
self.custom_object_client.create_namespaced_custom_object = MagicMock(return_value=running_vmi)
# Run recovery with mocked time.sleep
with patch('time.sleep'):
result = self.plugin.recover("test-vm", "default", False)
self.assertEqual(result, 0)
# Verify create was called with the right arguments for our API version and kind
self.custom_object_client.create_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
body=vmi_dict
)
def test_validation_failure(self):
"""
Test validation failure when KubeVirt is not installed
"""
# Mock empty CRD list (no KubeVirt CRDs)
empty_crd_list = MagicMock()
empty_crd_list.items = []
self.k8s_client.list_custom_resource_definition.return_value = empty_crd_list
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
self.assertEqual(result, 1)
def test_delete_vmi_timeout(self):
"""
Test timeout during VMI deletion
"""
# Mock successful delete operation
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
# Mock that get_vmi always returns VMI (never gets deleted)
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
# Simulate timeout by making time.time return values that exceed the timeout
with patch('time.sleep'), patch('time.time', side_effect=[0, 10, 20, 130, 130, 130, 130, 140]):
result = self.plugin.inject("test-vm", "default", False)
self.assertEqual(result, 1)
self.custom_object_client.delete_namespaced_custom_object.assert_called_once_with(
group="kubevirt.io",
version="v1",
namespace="default",
plural="virtualmachineinstances",
name="test-vm"
)
if __name__ == "__main__":
unittest.main()