mirror of
https://github.com/krkn-chaos/krkn.git
synced 2026-02-19 20:40:33 +00:00
Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0e5c8c55a4 | ||
|
|
9d9a6f9b80 | ||
|
|
f8fe2ae5b7 | ||
|
|
77b1dd32c7 | ||
|
|
9df727ccf5 | ||
|
|
70c8fec705 | ||
|
|
0731144a6b | ||
|
|
9337052e7b | ||
|
|
dc8d7ad75b | ||
|
|
1cc44e1f18 | ||
|
|
c8190fd1c1 | ||
|
|
9078b35e46 | ||
|
|
e6b1665aa1 | ||
|
|
c56819365c | ||
|
|
6a657576cb | ||
|
|
f04f1f1101 | ||
|
|
bddbd42f8c | ||
|
|
630dbd805b | ||
|
|
10d26ba50e | ||
|
|
d47286ae21 |
26
.github/workflows/tests.yml
vendored
26
.github/workflows/tests.yml
vendored
@@ -21,27 +21,7 @@ jobs:
|
||||
helm repo add stable https://charts.helm.sh/stable
|
||||
helm repo update
|
||||
- name: Deploy prometheus & Port Forwarding
|
||||
run: |
|
||||
kubectl create namespace prometheus-k8s
|
||||
helm install \
|
||||
--wait --timeout 360s \
|
||||
kind-prometheus \
|
||||
prometheus-community/kube-prometheus-stack \
|
||||
--namespace prometheus-k8s \
|
||||
--set prometheus.service.nodePort=30000 \
|
||||
--set prometheus.service.type=NodePort \
|
||||
--set grafana.service.nodePort=31000 \
|
||||
--set grafana.service.type=NodePort \
|
||||
--set alertmanager.service.nodePort=32000 \
|
||||
--set alertmanager.service.type=NodePort \
|
||||
--set prometheus-node-exporter.service.nodePort=32001 \
|
||||
--set prometheus-node-exporter.service.type=NodePort \
|
||||
--set prometheus.prometheusSpec.maximumStartupDurationSeconds=300
|
||||
|
||||
SELECTOR=`kubectl -n prometheus-k8s get service kind-prometheus-kube-prome-prometheus -o wide --no-headers=true | awk '{ print $7 }'`
|
||||
POD_NAME=`kubectl -n prometheus-k8s get pods --selector="$SELECTOR" --no-headers=true | awk '{ print $1 }'`
|
||||
kubectl -n prometheus-k8s port-forward $POD_NAME 9090:9090 &
|
||||
sleep 5
|
||||
uses: redhat-chaos/actions/prometheus@main
|
||||
- name: Install Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
@@ -82,12 +62,14 @@ jobs:
|
||||
echo "test_service_hijacking" > ./CI/tests/functional_tests
|
||||
echo "test_app_outages" >> ./CI/tests/functional_tests
|
||||
echo "test_container" >> ./CI/tests/functional_tests
|
||||
echo "test_pod" >> ./CI/tests/functional_tests
|
||||
echo "test_namespace" >> ./CI/tests/functional_tests
|
||||
echo "test_net_chaos" >> ./CI/tests/functional_tests
|
||||
echo "test_time" >> ./CI/tests/functional_tests
|
||||
echo "test_cpu_hog" >> ./CI/tests/functional_tests
|
||||
echo "test_memory_hog" >> ./CI/tests/functional_tests
|
||||
echo "test_io_hog" >> ./CI/tests/functional_tests
|
||||
echo "test_pod_network_filter" >> ./CI/tests/functional_tests
|
||||
|
||||
|
||||
# Push on main only steps + all other functional to collect coverage
|
||||
@@ -111,12 +93,14 @@ jobs:
|
||||
echo "test_service_hijacking" >> ./CI/tests/functional_tests
|
||||
echo "test_app_outages" >> ./CI/tests/functional_tests
|
||||
echo "test_container" >> ./CI/tests/functional_tests
|
||||
echo "test_pod" >> ./CI/tests/functional_tests
|
||||
echo "test_namespace" >> ./CI/tests/functional_tests
|
||||
echo "test_net_chaos" >> ./CI/tests/functional_tests
|
||||
echo "test_time" >> ./CI/tests/functional_tests
|
||||
echo "test_cpu_hog" >> ./CI/tests/functional_tests
|
||||
echo "test_memory_hog" >> ./CI/tests/functional_tests
|
||||
echo "test_io_hog" >> ./CI/tests/functional_tests
|
||||
echo "test_pod_network_filter" >> ./CI/tests/functional_tests
|
||||
|
||||
# Final common steps
|
||||
- name: Run Functional tests
|
||||
|
||||
@@ -2,12 +2,9 @@ kraken:
|
||||
distribution: kubernetes # Distribution can be kubernetes or openshift.
|
||||
kubeconfig_path: ~/.kube/config # Path to kubeconfig.
|
||||
exit_on_failure: False # Exit when a post action scenario fails.
|
||||
litmus_version: v1.13.6 # Litmus version to install.
|
||||
litmus_uninstall: False # If you want to uninstall litmus if failure.
|
||||
chaos_scenarios: # List of policies/chaos scenarios to load.
|
||||
- $scenario_type: # List of chaos pod scenarios to load.
|
||||
- $scenario_file
|
||||
$post_config
|
||||
cerberus:
|
||||
cerberus_enabled: False # Enable it when cerberus is previously installed.
|
||||
cerberus_url: # When cerberus_enabled is set to True, provide the url where cerberus publishes go/no-go signal.
|
||||
@@ -52,8 +49,6 @@ telemetry:
|
||||
telemetry_group: "funtests"
|
||||
elastic:
|
||||
enable_elastic: False
|
||||
collect_metrics: False
|
||||
collect_alerts: False
|
||||
verify_certs: False
|
||||
elastic_url: "https://192.168.39.196" # To track results in elasticsearch, give url to server here; will post telemetry details when url and index not blank
|
||||
elastic_port: 32766
|
||||
|
||||
29
CI/templates/pod_network_filter.yaml
Normal file
29
CI/templates/pod_network_filter.yaml
Normal file
@@ -0,0 +1,29 @@
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
name: pod-network-filter-test
|
||||
labels:
|
||||
app.kubernetes.io/name: pod-network-filter
|
||||
spec:
|
||||
containers:
|
||||
- name: nginx
|
||||
image: quay.io/krkn-chaos/krkn-funtests:pod-network-filter
|
||||
ports:
|
||||
- containerPort: 5000
|
||||
name: pod-network-prt
|
||||
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: pod-network-filter-service
|
||||
spec:
|
||||
selector:
|
||||
app.kubernetes.io/name: pod-network-filter
|
||||
type: NodePort
|
||||
ports:
|
||||
- name: pod-network-filter-svc
|
||||
protocol: TCP
|
||||
port: 80
|
||||
targetPort: pod-network-prt
|
||||
nodePort: 30037
|
||||
@@ -7,7 +7,7 @@ trap finish EXIT
|
||||
|
||||
|
||||
function functional_test_cpu_hog {
|
||||
yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/cpu-hog.yml
|
||||
yq -i '."node-selector"="kubernetes.io/hostname=kind-worker2"' scenarios/kube/cpu-hog.yml
|
||||
|
||||
export scenario_type="hog_scenarios"
|
||||
export scenario_file="scenarios/kube/cpu-hog.yml"
|
||||
|
||||
@@ -5,12 +5,13 @@ source CI/tests/common.sh
|
||||
trap error ERR
|
||||
trap finish EXIT
|
||||
|
||||
|
||||
function functional_test_io_hog {
|
||||
yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/io-hog.yml
|
||||
yq -i '."node-selector"="kubernetes.io/hostname=kind-worker2"' scenarios/kube/io-hog.yml
|
||||
export scenario_type="hog_scenarios"
|
||||
export scenario_file="scenarios/kube/io-hog.yml"
|
||||
export post_config=""
|
||||
|
||||
cat $scenario_file
|
||||
envsubst < CI/config/common_test_config.yaml > CI/config/io_hog.yaml
|
||||
python3 -m coverage run -a run_kraken.py -c CI/config/io_hog.yaml
|
||||
echo "IO Hog: Success"
|
||||
|
||||
@@ -7,7 +7,7 @@ trap finish EXIT
|
||||
|
||||
|
||||
function functional_test_memory_hog {
|
||||
yq -i '.node_selector="kubernetes.io/hostname=kind-worker2"' scenarios/kube/memory-hog.yml
|
||||
yq -i '."node-selector"="kubernetes.io/hostname=kind-worker2"' scenarios/kube/memory-hog.yml
|
||||
export scenario_type="hog_scenarios"
|
||||
export scenario_file="scenarios/kube/memory-hog.yml"
|
||||
export post_config=""
|
||||
|
||||
18
CI/tests/test_pod.sh
Executable file
18
CI/tests/test_pod.sh
Executable file
@@ -0,0 +1,18 @@
|
||||
set -xeEo pipefail
|
||||
|
||||
source CI/tests/common.sh
|
||||
|
||||
trap error ERR
|
||||
trap finish EXIT
|
||||
|
||||
function functional_test_pod_crash {
|
||||
export scenario_type="pod_disruption_scenarios"
|
||||
export scenario_file="scenarios/kind/pod_etcd.yml"
|
||||
export post_config=""
|
||||
envsubst < CI/config/common_test_config.yaml > CI/config/pod_config.yaml
|
||||
|
||||
python3 -m coverage run -a run_kraken.py -c CI/config/pod_config.yaml
|
||||
echo "Pod disruption scenario test: Success"
|
||||
}
|
||||
|
||||
functional_test_pod_crash
|
||||
59
CI/tests/test_pod_network_filter.sh
Executable file
59
CI/tests/test_pod_network_filter.sh
Executable file
@@ -0,0 +1,59 @@
|
||||
function functional_pod_network_filter {
|
||||
export SERVICE_URL="http://localhost:8889"
|
||||
export scenario_type="network_chaos_ng_scenarios"
|
||||
export scenario_file="scenarios/kube/pod-network-filter.yml"
|
||||
export post_config=""
|
||||
envsubst < CI/config/common_test_config.yaml > CI/config/pod_network_filter.yaml
|
||||
yq -i '.[0].test_duration=10' scenarios/kube/pod-network-filter.yml
|
||||
yq -i '.[0].label_selector=""' scenarios/kube/pod-network-filter.yml
|
||||
yq -i '.[0].ingress=false' scenarios/kube/pod-network-filter.yml
|
||||
yq -i '.[0].egress=true' scenarios/kube/pod-network-filter.yml
|
||||
yq -i '.[0].target="pod-network-filter-test"' scenarios/kube/pod-network-filter.yml
|
||||
yq -i '.[0].protocols=["tcp"]' scenarios/kube/pod-network-filter.yml
|
||||
yq -i '.[0].ports=[443]' scenarios/kube/pod-network-filter.yml
|
||||
|
||||
|
||||
## Test webservice deployment
|
||||
kubectl apply -f ./CI/templates/pod_network_filter.yaml
|
||||
COUNTER=0
|
||||
while true
|
||||
do
|
||||
curl $SERVICE_URL
|
||||
EXITSTATUS=$?
|
||||
if [ "$EXITSTATUS" -eq "0" ]
|
||||
then
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
COUNTER=$((COUNTER+1))
|
||||
[ $COUNTER -eq "100" ] && echo "maximum number of retry reached, test failed" && exit 1
|
||||
done
|
||||
|
||||
python3 -m coverage run -a run_kraken.py -c CI/config/pod_network_filter.yaml > /dev/null 2>&1 &
|
||||
PID=$!
|
||||
|
||||
# wait until the dns resolution starts failing and the service returns 400
|
||||
DNS_FAILURE_STATUS=0
|
||||
while true
|
||||
do
|
||||
OUT_STATUS_CODE=$(curl -X GET -s -o /dev/null -I -w "%{http_code}" $SERVICE_URL)
|
||||
if [ "$OUT_STATUS_CODE" -eq "404" ]
|
||||
then
|
||||
DNS_FAILURE_STATUS=404
|
||||
fi
|
||||
|
||||
if [ "$DNS_FAILURE_STATUS" -eq "404" ] && [ "$OUT_STATUS_CODE" -eq "200" ]
|
||||
then
|
||||
echo "service restored"
|
||||
break
|
||||
fi
|
||||
COUNTER=$((COUNTER+1))
|
||||
[ $COUNTER -eq "100" ] && echo "maximum number of retry reached, test failed" && exit 1
|
||||
sleep 2
|
||||
done
|
||||
|
||||
wait $PID
|
||||
}
|
||||
|
||||
functional_pod_network_filter
|
||||
|
||||
@@ -47,6 +47,8 @@ kraken:
|
||||
- scenarios/kube/syn_flood.yaml
|
||||
- network_chaos_ng_scenarios:
|
||||
- scenarios/kube/network-filter.yml
|
||||
- kubevirt_vm_outage:
|
||||
- scenarios/kubevirt/kubevirt-vm-outage.yaml
|
||||
|
||||
cerberus:
|
||||
cerberus_enabled: False # Enable it when cerberus is previously installed
|
||||
|
||||
@@ -5,6 +5,8 @@ nodes:
|
||||
extraPortMappings:
|
||||
- containerPort: 30036
|
||||
hostPort: 8888
|
||||
- containerPort: 30037
|
||||
hostPort: 8889
|
||||
- role: control-plane
|
||||
- role: control-plane
|
||||
- role: worker
|
||||
|
||||
@@ -9,6 +9,7 @@ import logging
|
||||
import urllib3
|
||||
import sys
|
||||
import json
|
||||
import tempfile
|
||||
|
||||
import yaml
|
||||
from krkn_lib.elastic.krkn_elastic import KrknElastic
|
||||
@@ -251,11 +252,29 @@ def metrics(
|
||||
metric[k] = v
|
||||
metric['timestamp'] = str(datetime.datetime.now())
|
||||
metrics_list.append(metric.copy())
|
||||
if elastic:
|
||||
|
||||
save_metrics = False
|
||||
if elastic is not None and elastic_metrics_index is not None:
|
||||
result = elastic.upload_metrics_to_elasticsearch(
|
||||
run_uuid=run_uuid, index=elastic_metrics_index, raw_data=metrics_list
|
||||
)
|
||||
if result == -1:
|
||||
logging.error("failed to save metrics on ElasticSearch")
|
||||
save_metrics = True
|
||||
else:
|
||||
save_metrics = True
|
||||
if save_metrics:
|
||||
local_dir = os.path.join(tempfile.gettempdir(), "krkn_metrics")
|
||||
os.makedirs(local_dir, exist_ok=True)
|
||||
local_file = os.path.join(local_dir, f"{elastic_metrics_index}_{run_uuid}.json")
|
||||
|
||||
try:
|
||||
with open(local_file, "w") as f:
|
||||
json.dump({
|
||||
"run_uuid": run_uuid,
|
||||
"metrics": metrics_list
|
||||
}, f, indent=2)
|
||||
logging.info(f"Metrics saved to {local_file}")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to save metrics to {local_file}: {e}")
|
||||
return metrics_list
|
||||
|
||||
@@ -117,3 +117,5 @@ class AbstractScenarioPlugin(ABC):
|
||||
logging.info(f"wating {wait_duration} before running the next scenario")
|
||||
time.sleep(wait_duration)
|
||||
return failed_scenarios, scenario_telemetries
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from krkn_lib.models.telemetry import ScenarioTelemetry
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
from krkn_lib.utils import get_yaml_item_value
|
||||
|
||||
from krkn import cerberus
|
||||
|
||||
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
|
||||
|
||||
|
||||
@@ -44,7 +44,6 @@ class ContainerScenarioPlugin(AbstractScenarioPlugin):
|
||||
return 1
|
||||
scenario_telemetry.affected_pods = result
|
||||
|
||||
# publish cerberus status
|
||||
except (RuntimeError, Exception):
|
||||
logging.error("ContainerScenarioPlugin exiting due to Exception %s")
|
||||
return 1
|
||||
@@ -63,6 +62,7 @@ class ContainerScenarioPlugin(AbstractScenarioPlugin):
|
||||
namespace_pattern=namespace_pattern,
|
||||
label_selector=label_selector,
|
||||
max_timeout=recovery_time,
|
||||
field_selector="status.phase=Running"
|
||||
)
|
||||
|
||||
def container_killing_in_pod(self, cont_scenario, kubecli: KrknKubernetes):
|
||||
|
||||
@@ -0,0 +1,399 @@
|
||||
import logging
|
||||
import time
|
||||
from typing import Dict, Any, Optional
|
||||
import random
|
||||
import re
|
||||
import yaml
|
||||
from kubernetes.client.rest import ApiException
|
||||
from krkn_lib.k8s import KrknKubernetes
|
||||
from krkn_lib.models.telemetry import ScenarioTelemetry
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
from krkn_lib.utils import log_exception
|
||||
from krkn_lib.models.k8s import AffectedPod, PodsStatus
|
||||
|
||||
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
|
||||
|
||||
|
||||
class KubevirtVmOutageScenarioPlugin(AbstractScenarioPlugin):
|
||||
"""
|
||||
A scenario plugin that injects chaos by deleting a KubeVirt Virtual Machine Instance (VMI).
|
||||
This plugin simulates a VM crash or outage scenario and supports automated or manual recovery.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.k8s_client = None
|
||||
self.original_vmi = None
|
||||
|
||||
# Scenario type is handled directly in execute_scenario
|
||||
def get_scenario_types(self) -> list[str]:
|
||||
return ["kubevirt_vm_outage"]
|
||||
|
||||
def run(
|
||||
self,
|
||||
run_uuid: str,
|
||||
scenario: str,
|
||||
krkn_config: dict[str, any],
|
||||
lib_telemetry: KrknTelemetryOpenshift,
|
||||
scenario_telemetry: ScenarioTelemetry,
|
||||
) -> int:
|
||||
"""
|
||||
Main entry point for the plugin.
|
||||
Parses the scenario configuration and executes the chaos scenario.
|
||||
"""
|
||||
try:
|
||||
with open(scenario, "r") as f:
|
||||
scenario_config = yaml.full_load(f)
|
||||
|
||||
self.init_clients(lib_telemetry.get_lib_kubernetes())
|
||||
pods_status = PodsStatus()
|
||||
for config in scenario_config["scenarios"]:
|
||||
if config.get("scenario") == "kubevirt_vm_outage":
|
||||
single_pods_status = self.execute_scenario(config, scenario_telemetry)
|
||||
pods_status.merge(single_pods_status)
|
||||
|
||||
scenario_telemetry.affected_pods = pods_status
|
||||
|
||||
return 0
|
||||
except Exception as e:
|
||||
logging.error(f"KubeVirt VM Outage scenario failed: {e}")
|
||||
log_exception(e)
|
||||
return 1
|
||||
|
||||
def init_clients(self, k8s_client: KrknKubernetes):
|
||||
"""
|
||||
Initialize Kubernetes client for KubeVirt operations.
|
||||
"""
|
||||
self.k8s_client = k8s_client
|
||||
self.custom_object_client = k8s_client.custom_object_client
|
||||
logging.info("Successfully initialized Kubernetes client for KubeVirt operations")
|
||||
|
||||
def get_vmi(self, name: str, namespace: str) -> Optional[Dict]:
|
||||
"""
|
||||
Get a Virtual Machine Instance by name and namespace.
|
||||
|
||||
:param name: Name of the VMI to retrieve
|
||||
:param namespace: Namespace of the VMI
|
||||
:return: The VMI object if found, None otherwise
|
||||
"""
|
||||
try:
|
||||
vmi = self.custom_object_client.get_namespaced_custom_object(
|
||||
group="kubevirt.io",
|
||||
version="v1",
|
||||
namespace=namespace,
|
||||
plural="virtualmachineinstances",
|
||||
name=name
|
||||
)
|
||||
return vmi
|
||||
except ApiException as e:
|
||||
if e.status == 404:
|
||||
logging.warning(f"VMI {name} not found in namespace {namespace}")
|
||||
return None
|
||||
else:
|
||||
logging.error(f"Error getting VMI {name}: {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error getting VMI {name}: {e}")
|
||||
raise
|
||||
|
||||
def get_vmis(self, regex_name: str, namespace: str) -> Optional[Dict]:
|
||||
"""
|
||||
Get a Virtual Machine Instance by name and namespace.
|
||||
|
||||
:param name: Name of the VMI to retrieve
|
||||
:param namespace: Namespace of the VMI
|
||||
:return: The VMI object if found, None otherwise
|
||||
"""
|
||||
try:
|
||||
vmis = self.custom_object_client.list_namespaced_custom_object(
|
||||
group="kubevirt.io",
|
||||
version="v1",
|
||||
namespace=namespace,
|
||||
plural="virtualmachineinstances",
|
||||
)
|
||||
|
||||
vmi_list = []
|
||||
for vmi in vmis.get("items"):
|
||||
vmi_name = vmi.get("metadata",{}).get("name")
|
||||
match = re.match(regex_name, vmi_name)
|
||||
if match:
|
||||
vmi_list.append(vmi)
|
||||
return vmi_list
|
||||
except ApiException as e:
|
||||
if e.status == 404:
|
||||
logging.warning(f"VMI {regex_name} not found in namespace {namespace}")
|
||||
return None
|
||||
else:
|
||||
logging.error(f"Error getting VMI {regex_name}: {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error getting VMI {regex_name}: {e}")
|
||||
raise
|
||||
|
||||
def execute_scenario(self, config: Dict[str, Any], scenario_telemetry: ScenarioTelemetry) -> int:
|
||||
"""
|
||||
Execute a KubeVirt VM outage scenario based on the provided configuration.
|
||||
|
||||
:param config: The scenario configuration
|
||||
:param scenario_telemetry: The telemetry object for recording metrics
|
||||
:return: 0 for success, 1 for failure
|
||||
"""
|
||||
try:
|
||||
params = config.get("parameters", {})
|
||||
vm_name = params.get("vm_name")
|
||||
namespace = params.get("namespace", "default")
|
||||
timeout = params.get("timeout", 60)
|
||||
kill_count = params.get("kill_count", 1)
|
||||
disable_auto_restart = params.get("disable_auto_restart", False)
|
||||
self.pods_status = PodsStatus()
|
||||
if not vm_name:
|
||||
logging.error("vm_name parameter is required")
|
||||
return 1
|
||||
vmis_list = self.get_vmis(vm_name,namespace)
|
||||
rand_int = random.randint(0, len(vmis_list) - 1)
|
||||
vmi = vmis_list[rand_int]
|
||||
|
||||
logging.info(f"Starting KubeVirt VM outage scenario for VM: {vm_name} in namespace: {namespace}")
|
||||
vmi_name = vmi.get("metadata").get("name")
|
||||
if not self.validate_environment(vmi_name, namespace):
|
||||
return 1
|
||||
|
||||
vmi = self.get_vmi(vmi_name, namespace)
|
||||
self.affected_pod = AffectedPod(
|
||||
pod_name=vmi_name,
|
||||
namespace=namespace,
|
||||
)
|
||||
if not vmi:
|
||||
logging.error(f"VMI {vm_name} not found in namespace {namespace}")
|
||||
return 1
|
||||
|
||||
self.original_vmi = vmi
|
||||
logging.info(f"Captured initial state of VMI: {vm_name}")
|
||||
result = self.delete_vmi(vmi_name, namespace, disable_auto_restart)
|
||||
if result != 0:
|
||||
|
||||
return self.pods_status
|
||||
|
||||
result = self.wait_for_running(vmi_name,namespace, timeout)
|
||||
if result != 0:
|
||||
self.recover(vmi_name, namespace)
|
||||
self.pods_status.unrecovered = self.affected_pod
|
||||
return self.pods_status
|
||||
|
||||
self.affected_pod.total_recovery_time = (
|
||||
self.affected_pod.pod_readiness_time
|
||||
+ self.affected_pod.pod_rescheduling_time
|
||||
)
|
||||
|
||||
self.pods_status.recovered.append(self.affected_pod)
|
||||
logging.info(f"Successfully completed KubeVirt VM outage scenario for VM: {vm_name}")
|
||||
|
||||
return self.pods_status
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error executing KubeVirt VM outage scenario: {e}")
|
||||
log_exception(e)
|
||||
return 1
|
||||
|
||||
def validate_environment(self, vm_name: str, namespace: str) -> bool:
|
||||
"""
|
||||
Validate that KubeVirt is installed and the specified VM exists.
|
||||
|
||||
:param vm_name: Name of the VM to validate
|
||||
:param namespace: Namespace of the VM
|
||||
:return: True if environment is valid, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Check if KubeVirt CRDs exist
|
||||
crd_list = self.custom_object_client.list_namespaced_custom_object("kubevirt.io","v1",namespace,"virtualmachines")
|
||||
kubevirt_crds = [crd for crd in crd_list.items() ]
|
||||
|
||||
if not kubevirt_crds:
|
||||
logging.error("KubeVirt CRDs not found. Ensure KubeVirt/CNV is installed in the cluster")
|
||||
return False
|
||||
|
||||
# Check if VMI exists
|
||||
vmi = self.get_vmi(vm_name, namespace)
|
||||
if not vmi:
|
||||
logging.error(f"VMI {vm_name} not found in namespace {namespace}")
|
||||
return False
|
||||
|
||||
logging.info(f"Validated environment: KubeVirt is installed and VMI {vm_name} exists")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error validating environment: {e}")
|
||||
return False
|
||||
|
||||
def patch_vm_spec(self, vm_name: str, namespace: str, running: bool) -> bool:
|
||||
"""
|
||||
Patch the VM spec to enable/disable auto-restart.
|
||||
|
||||
:param vm_name: Name of the VM to patch
|
||||
:param namespace: Namespace of the VM
|
||||
:param running: Whether the VM should be set to running state
|
||||
:return: True if patch was successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Get the VM object first to get its current spec
|
||||
vm = self.custom_object_client.get_namespaced_custom_object(
|
||||
group="kubevirt.io",
|
||||
version="v1",
|
||||
namespace=namespace,
|
||||
plural="virtualmachines",
|
||||
name=vm_name
|
||||
)
|
||||
|
||||
# Update the running state
|
||||
if 'spec' not in vm:
|
||||
vm['spec'] = {}
|
||||
vm['spec']['running'] = running
|
||||
|
||||
# Apply the patch
|
||||
self.custom_object_client.patch_namespaced_custom_object(
|
||||
group="kubevirt.io",
|
||||
version="v1",
|
||||
namespace=namespace,
|
||||
plural="virtualmachines",
|
||||
name=vm_name,
|
||||
body=vm
|
||||
)
|
||||
return True
|
||||
|
||||
except ApiException as e:
|
||||
logging.error(f"Failed to patch VM {vm_name}: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error patching VM {vm_name}: {e}")
|
||||
return False
|
||||
|
||||
def delete_vmi(self, vm_name: str, namespace: str, disable_auto_restart: bool = False, timeout: int = 120) -> int:
|
||||
"""
|
||||
Delete a Virtual Machine Instance to simulate a VM outage.
|
||||
|
||||
:param vm_name: Name of the VMI to delete
|
||||
:param namespace: Namespace of the VMI
|
||||
:return: 0 for success, 1 for failure
|
||||
"""
|
||||
try:
|
||||
logging.info(f"Injecting chaos: Deleting VMI {vm_name} in namespace {namespace}")
|
||||
|
||||
# If auto-restart should be disabled, patch the VM spec first
|
||||
if disable_auto_restart:
|
||||
logging.info(f"Disabling auto-restart for VM {vm_name} by setting spec.running=False")
|
||||
if not self.patch_vm_spec(vm_name, namespace, running=False):
|
||||
logging.error("Failed to disable auto-restart for VM"
|
||||
" - proceeding with deletion but VM may auto-restart")
|
||||
start_creation_time = self.original_vmi.get('metadata', {}).get('creationTimestamp')
|
||||
start_time = time.time()
|
||||
try:
|
||||
self.custom_object_client.delete_namespaced_custom_object(
|
||||
group="kubevirt.io",
|
||||
version="v1",
|
||||
namespace=namespace,
|
||||
plural="virtualmachineinstances",
|
||||
name=vm_name
|
||||
)
|
||||
except ApiException as e:
|
||||
if e.status == 404:
|
||||
logging.warning(f"VMI {vm_name} not found during deletion")
|
||||
return 1
|
||||
else:
|
||||
logging.error(f"API error during VMI deletion: {e}")
|
||||
return 1
|
||||
|
||||
# Wait for the VMI to be deleted
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
deleted_vmi = self.get_vmi(vm_name, namespace)
|
||||
if deleted_vmi:
|
||||
if start_creation_time != deleted_vmi.get('metadata', {}).get('creationTimestamp'):
|
||||
logging.info(f"VMI {vm_name} successfully recreated")
|
||||
self.affected_pod.pod_rescheduling_time = time.time() - start_time
|
||||
return 0
|
||||
else:
|
||||
logging.info(f"VMI {vm_name} successfully deleted")
|
||||
time.sleep(1)
|
||||
|
||||
logging.error(f"Timed out waiting for VMI {vm_name} to be deleted")
|
||||
self.pods_status.unrecovered = self.affected_pod
|
||||
return 1
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error deleting VMI {vm_name}: {e}")
|
||||
log_exception(e)
|
||||
self.pods_status.unrecovered = self.affected_pod
|
||||
return 1
|
||||
|
||||
def wait_for_running(self, vm_name: str, namespace: str, timeout: int = 120) -> int:
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout:
|
||||
|
||||
# Check current state once since we've already waited for the duration
|
||||
vmi = self.get_vmi(vm_name, namespace)
|
||||
|
||||
if vmi:
|
||||
if vmi.get('status', {}).get('phase') == "Running":
|
||||
end_time = time.time()
|
||||
self.affected_pod.pod_readiness_time = end_time - start_time
|
||||
|
||||
logging.info(f"VMI {vm_name} is already running")
|
||||
return 0
|
||||
logging.info(f"VMI {vm_name} exists but is not in Running state. Current state: {vmi.get('status', {}).get('phase')}")
|
||||
else:
|
||||
logging.info(f"VMI {vm_name} not yet recreated")
|
||||
time.sleep(1)
|
||||
return 1
|
||||
|
||||
|
||||
def recover(self, vm_name: str, namespace: str, disable_auto_restart: bool = False) -> int:
|
||||
"""
|
||||
Recover a deleted VMI, either by waiting for auto-recovery or manually recreating it.
|
||||
|
||||
:param vm_name: Name of the VMI to recover
|
||||
:param namespace: Namespace of the VMI
|
||||
:param disable_auto_restart: Whether auto-restart was disabled during injection
|
||||
:return: 0 for success, 1 for failure
|
||||
"""
|
||||
try:
|
||||
logging.info(f"Attempting to recover VMI {vm_name} in namespace {namespace}")
|
||||
|
||||
if self.original_vmi:
|
||||
logging.info(f"Auto-recovery didn't occur for VMI {vm_name}. Attempting manual recreation")
|
||||
|
||||
try:
|
||||
# Clean up server-generated fields
|
||||
vmi_dict = self.original_vmi.copy()
|
||||
if 'metadata' in vmi_dict:
|
||||
metadata = vmi_dict['metadata']
|
||||
for field in ['resourceVersion', 'uid', 'creationTimestamp', 'generation']:
|
||||
if field in metadata:
|
||||
del metadata[field]
|
||||
|
||||
# Create the VMI
|
||||
self.custom_object_client.create_namespaced_custom_object(
|
||||
group="kubevirt.io",
|
||||
version="v1",
|
||||
namespace=namespace,
|
||||
plural="virtualmachineinstances",
|
||||
body=vmi_dict
|
||||
)
|
||||
logging.info(f"Successfully recreated VMI {vm_name}")
|
||||
|
||||
# Wait for VMI to start running
|
||||
self.wait_for_running(vm_name,namespace)
|
||||
|
||||
logging.warning(f"VMI {vm_name} was recreated but didn't reach Running state in time")
|
||||
return 0 # Still consider it a success as the VMI was recreated
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error recreating VMI {vm_name}: {e}")
|
||||
log_exception(e)
|
||||
return 1
|
||||
else:
|
||||
logging.error(f"Failed to recover VMI {vm_name}: No original state captured and auto-recovery did not occur")
|
||||
return 1
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error recovering VMI {vm_name}: {e}")
|
||||
log_exception(e)
|
||||
return 1
|
||||
@@ -17,26 +17,14 @@ class NativeScenarioPlugin(AbstractScenarioPlugin):
|
||||
lib_telemetry: KrknTelemetryOpenshift,
|
||||
scenario_telemetry: ScenarioTelemetry,
|
||||
) -> int:
|
||||
pool = PodsMonitorPool(lib_telemetry.get_lib_kubernetes())
|
||||
kill_scenarios = [
|
||||
kill_scenario
|
||||
for kill_scenario in PLUGINS.unserialize_scenario(scenario)
|
||||
if kill_scenario["id"] == "kill-pods"
|
||||
]
|
||||
|
||||
try:
|
||||
self.start_monitoring(pool, kill_scenarios)
|
||||
PLUGINS.run(
|
||||
scenario,
|
||||
lib_telemetry.get_lib_kubernetes().get_kubeconfig_path(),
|
||||
krkn_config,
|
||||
run_uuid,
|
||||
)
|
||||
result = pool.join()
|
||||
scenario_telemetry.affected_pods = result
|
||||
if result.error:
|
||||
logging.error(f"NativeScenarioPlugin unrecovered pods: {result.error}")
|
||||
return 1
|
||||
|
||||
except Exception as e:
|
||||
logging.error("NativeScenarioPlugin exiting due to Exception %s" % e)
|
||||
@@ -47,46 +35,6 @@ class NativeScenarioPlugin(AbstractScenarioPlugin):
|
||||
|
||||
def get_scenario_types(self) -> list[str]:
|
||||
return [
|
||||
"pod_disruption_scenarios",
|
||||
"pod_network_scenarios",
|
||||
"ingress_node_scenarios"
|
||||
]
|
||||
|
||||
def start_monitoring(self, pool: PodsMonitorPool, scenarios: list[Any]):
|
||||
for kill_scenario in scenarios:
|
||||
recovery_time = kill_scenario["config"]["krkn_pod_recovery_time"]
|
||||
if (
|
||||
"namespace_pattern" in kill_scenario["config"]
|
||||
and "label_selector" in kill_scenario["config"]
|
||||
):
|
||||
namespace_pattern = kill_scenario["config"]["namespace_pattern"]
|
||||
label_selector = kill_scenario["config"]["label_selector"]
|
||||
pool.select_and_monitor_by_namespace_pattern_and_label(
|
||||
namespace_pattern=namespace_pattern,
|
||||
label_selector=label_selector,
|
||||
max_timeout=recovery_time,
|
||||
)
|
||||
logging.info(
|
||||
f"waiting {recovery_time} seconds for pod recovery, "
|
||||
f"pod label selector: {label_selector} namespace pattern: {namespace_pattern}"
|
||||
)
|
||||
|
||||
elif (
|
||||
"namespace_pattern" in kill_scenario["config"]
|
||||
and "name_pattern" in kill_scenario["config"]
|
||||
):
|
||||
namespace_pattern = kill_scenario["config"]["namespace_pattern"]
|
||||
name_pattern = kill_scenario["config"]["name_pattern"]
|
||||
pool.select_and_monitor_by_name_pattern_and_namespace_pattern(
|
||||
pod_name_pattern=name_pattern,
|
||||
namespace_pattern=namespace_pattern,
|
||||
max_timeout=recovery_time,
|
||||
)
|
||||
logging.info(
|
||||
f"waiting {recovery_time} seconds for pod recovery, "
|
||||
f"pod name pattern: {name_pattern} namespace pattern: {namespace_pattern}"
|
||||
)
|
||||
else:
|
||||
raise Exception(
|
||||
f"impossible to determine monitor parameters, check {kill_scenario} configuration"
|
||||
)
|
||||
|
||||
@@ -4,7 +4,6 @@ import logging
|
||||
from os.path import abspath
|
||||
from typing import List, Any, Dict
|
||||
from krkn.scenario_plugins.native.run_python_plugin import run_python_file
|
||||
from arcaflow_plugin_kill_pod import kill_pods, wait_for_pods
|
||||
from krkn.scenario_plugins.native.network.ingress_shaping import network_chaos
|
||||
from krkn.scenario_plugins.native.pod_network_outage.pod_network_outage_plugin import (
|
||||
pod_outage,
|
||||
@@ -148,13 +147,6 @@ class Plugins:
|
||||
|
||||
PLUGINS = Plugins(
|
||||
[
|
||||
PluginStep(
|
||||
kill_pods,
|
||||
[
|
||||
"error",
|
||||
],
|
||||
),
|
||||
PluginStep(wait_for_pods, ["error"]),
|
||||
PluginStep(run_python_file, ["error"]),
|
||||
PluginStep(network_chaos, ["error"]),
|
||||
PluginStep(pod_outage, ["error"]),
|
||||
|
||||
@@ -6,6 +6,7 @@ class NetworkChaosScenarioType(Enum):
|
||||
Node = 1
|
||||
Pod = 2
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseNetworkChaosConfig:
|
||||
supported_execution = ["serial", "parallel"]
|
||||
@@ -20,13 +21,22 @@ class BaseNetworkChaosConfig:
|
||||
def validate(self) -> list[str]:
|
||||
errors = []
|
||||
if self.execution is None:
|
||||
errors.append(f"execution cannot be None, supported values are: {','.join(self.supported_execution)}")
|
||||
errors.append(
|
||||
f"execution cannot be None, supported values are: {','.join(self.supported_execution)}"
|
||||
)
|
||||
if self.execution not in self.supported_execution:
|
||||
errors.append(f"{self.execution} is not in supported execution mod: {','.join(self.supported_execution)}")
|
||||
if self.label_selector is None:
|
||||
errors.append(
|
||||
f"{self.execution} is not in supported execution mod: {','.join(self.supported_execution)}"
|
||||
)
|
||||
if self.id == "node_network_filter" and self.label_selector is None:
|
||||
errors.append("label_selector cannot be None")
|
||||
if not isinstance(self.wait_duration, int):
|
||||
errors.append("wait_duration must be an int")
|
||||
if not isinstance(self.test_duration, int):
|
||||
errors.append("test_duration must be an int")
|
||||
return errors
|
||||
|
||||
|
||||
@dataclass
|
||||
class NetworkFilterConfig(BaseNetworkChaosConfig):
|
||||
ingress: bool
|
||||
@@ -34,8 +44,15 @@ class NetworkFilterConfig(BaseNetworkChaosConfig):
|
||||
interfaces: list[str]
|
||||
target: str
|
||||
ports: list[int]
|
||||
image: str
|
||||
protocols: list[str]
|
||||
|
||||
def validate(self) -> list[str]:
|
||||
errors = super().validate()
|
||||
# here further validations
|
||||
allowed_protocols = {"tcp", "udp"}
|
||||
if not set(self.protocols).issubset(allowed_protocols):
|
||||
errors.append(
|
||||
f"{self.protocols} contains not allowed protocols only tcp and udp is allowed"
|
||||
)
|
||||
return errors
|
||||
|
||||
@@ -3,19 +3,25 @@ import logging
|
||||
import queue
|
||||
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import BaseNetworkChaosConfig, NetworkChaosScenarioType
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import (
|
||||
BaseNetworkChaosConfig,
|
||||
NetworkChaosScenarioType,
|
||||
)
|
||||
|
||||
|
||||
class AbstractNetworkChaosModule(abc.ABC):
|
||||
"""
|
||||
The abstract class that needs to be implemented by each Network Chaos Scenario
|
||||
"""
|
||||
|
||||
kubecli: KrknTelemetryOpenshift
|
||||
base_network_config: BaseNetworkChaosConfig
|
||||
|
||||
@abc.abstractmethod
|
||||
def run(self, target: str, kubecli: KrknTelemetryOpenshift, error_queue: queue.Queue = None):
|
||||
def run(self, target: str, error_queue: queue.Queue = None):
|
||||
"""
|
||||
the entrypoint method for the Network Chaos Scenario
|
||||
:param target: The resource name that will be targeted by the scenario (Node Name, Pod Name etc.)
|
||||
:param kubecli: The `KrknTelemetryOpenshift` needed by the scenario to access to the krkn-lib methods
|
||||
:param error_queue: A queue that will be used by the plugin to push the errors raised during the execution of parallel modules
|
||||
"""
|
||||
pass
|
||||
@@ -28,31 +34,17 @@ class AbstractNetworkChaosModule(abc.ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
def get_targets(self) -> list[str]:
|
||||
"""
|
||||
checks and returns the targets based on the common scenario configuration
|
||||
"""
|
||||
|
||||
def log_info(self, message: str, parallel: bool = False, node_name: str = ""):
|
||||
"""
|
||||
log helper method for INFO severity to be used in the scenarios
|
||||
"""
|
||||
if parallel:
|
||||
logging.info(f"[{node_name}]: {message}")
|
||||
else:
|
||||
logging.info(message)
|
||||
pass
|
||||
|
||||
def log_warning(self, message: str, parallel: bool = False, node_name: str = ""):
|
||||
"""
|
||||
log helper method for WARNING severity to be used in the scenarios
|
||||
"""
|
||||
if parallel:
|
||||
logging.warning(f"[{node_name}]: {message}")
|
||||
else:
|
||||
logging.warning(message)
|
||||
|
||||
|
||||
def log_error(self, message: str, parallel: bool = False, node_name: str = ""):
|
||||
"""
|
||||
log helper method for ERROR severity to be used in the scenarios
|
||||
"""
|
||||
if parallel:
|
||||
logging.error(f"[{node_name}]: {message}")
|
||||
else:
|
||||
logging.error(message)
|
||||
def __init__(
|
||||
self,
|
||||
base_network_config: BaseNetworkChaosConfig,
|
||||
kubecli: KrknTelemetryOpenshift,
|
||||
):
|
||||
self.kubecli = kubecli
|
||||
self.base_network_config = base_network_config
|
||||
|
||||
@@ -1,11 +1,6 @@
|
||||
import os
|
||||
import queue
|
||||
import time
|
||||
|
||||
import yaml
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
|
||||
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
from krkn_lib.utils import get_random_string
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import (
|
||||
@@ -16,87 +11,92 @@ from krkn.scenario_plugins.network_chaos_ng.models import (
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
|
||||
AbstractNetworkChaosModule,
|
||||
)
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
|
||||
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_filter import (
|
||||
deploy_network_filter_pod,
|
||||
apply_network_rules,
|
||||
clean_network_rules,
|
||||
generate_rules,
|
||||
get_default_interface,
|
||||
)
|
||||
|
||||
|
||||
class NodeNetworkFilterModule(AbstractNetworkChaosModule):
|
||||
config: NetworkFilterConfig
|
||||
kubecli: KrknTelemetryOpenshift
|
||||
|
||||
def run(
|
||||
self,
|
||||
target: str,
|
||||
kubecli: KrknTelemetryOpenshift,
|
||||
error_queue: queue.Queue = None,
|
||||
):
|
||||
def run(self, target: str, error_queue: queue.Queue = None):
|
||||
parallel = False
|
||||
if error_queue:
|
||||
parallel = True
|
||||
try:
|
||||
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
|
||||
env = Environment(loader=file_loader, autoescape=True)
|
||||
pod_name = f"node-filter-{get_random_string(5)}"
|
||||
pod_template = env.get_template("templates/network-chaos.j2")
|
||||
pod_body = yaml.safe_load(
|
||||
pod_template.render(
|
||||
pod_name=pod_name,
|
||||
namespace=self.config.namespace,
|
||||
host_network=True,
|
||||
target=target,
|
||||
)
|
||||
)
|
||||
self.log_info(
|
||||
f"creating pod to filter "
|
||||
log_info(
|
||||
f"creating workload to filter node {target} network"
|
||||
f"ports {','.join([str(port) for port in self.config.ports])}, "
|
||||
f"ingress:{str(self.config.ingress)}, "
|
||||
f"egress:{str(self.config.egress)}",
|
||||
parallel,
|
||||
target,
|
||||
)
|
||||
kubecli.get_lib_kubernetes().create_pod(
|
||||
pod_body, self.config.namespace, 300
|
||||
|
||||
pod_name = f"node-filter-{get_random_string(5)}"
|
||||
deploy_network_filter_pod(
|
||||
self.config,
|
||||
target,
|
||||
pod_name,
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
)
|
||||
|
||||
if len(self.config.interfaces) == 0:
|
||||
interfaces = [
|
||||
self.get_default_interface(pod_name, self.config.namespace, kubecli)
|
||||
get_default_interface(
|
||||
pod_name,
|
||||
self.config.namespace,
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
)
|
||||
]
|
||||
self.log_info(f"detected default interface {interfaces[0]}")
|
||||
|
||||
log_info(
|
||||
f"detected default interface {interfaces[0]}", parallel, target
|
||||
)
|
||||
|
||||
else:
|
||||
interfaces = self.config.interfaces
|
||||
|
||||
input_rules, output_rules = self.generate_rules(interfaces)
|
||||
input_rules, output_rules = generate_rules(interfaces, self.config)
|
||||
|
||||
for rule in input_rules:
|
||||
self.log_info(f"applying iptables INPUT rule: {rule}", parallel, target)
|
||||
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
|
||||
[rule], pod_name, self.config.namespace
|
||||
)
|
||||
for rule in output_rules:
|
||||
self.log_info(
|
||||
f"applying iptables OUTPUT rule: {rule}", parallel, target
|
||||
)
|
||||
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
|
||||
[rule], pod_name, self.config.namespace
|
||||
)
|
||||
self.log_info(
|
||||
f"waiting {self.config.test_duration} seconds before removing the iptables rules"
|
||||
apply_network_rules(
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
input_rules,
|
||||
output_rules,
|
||||
pod_name,
|
||||
self.config.namespace,
|
||||
parallel,
|
||||
target,
|
||||
)
|
||||
|
||||
log_info(
|
||||
f"waiting {self.config.test_duration} seconds before removing the iptables rules",
|
||||
parallel,
|
||||
target,
|
||||
)
|
||||
|
||||
time.sleep(self.config.test_duration)
|
||||
self.log_info("removing iptables rules")
|
||||
for _ in input_rules:
|
||||
# always deleting the first rule since has been inserted from the top
|
||||
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
|
||||
[f"iptables -D INPUT 1"], pod_name, self.config.namespace
|
||||
)
|
||||
for _ in output_rules:
|
||||
# always deleting the first rule since has been inserted from the top
|
||||
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
|
||||
[f"iptables -D OUTPUT 1"], pod_name, self.config.namespace
|
||||
)
|
||||
self.log_info(
|
||||
f"deleting network chaos pod {pod_name} from {self.config.namespace}"
|
||||
|
||||
log_info("removing iptables rules", parallel, target)
|
||||
|
||||
clean_network_rules(
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
input_rules,
|
||||
output_rules,
|
||||
pod_name,
|
||||
self.config.namespace,
|
||||
)
|
||||
|
||||
kubecli.get_lib_kubernetes().delete_pod(pod_name, self.config.namespace)
|
||||
self.kubecli.get_lib_kubernetes().delete_pod(
|
||||
pod_name, self.config.namespace
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
if error_queue is None:
|
||||
@@ -104,33 +104,25 @@ class NodeNetworkFilterModule(AbstractNetworkChaosModule):
|
||||
else:
|
||||
error_queue.put(str(e))
|
||||
|
||||
def __init__(self, config: NetworkFilterConfig):
|
||||
def __init__(self, config: NetworkFilterConfig, kubecli: KrknTelemetryOpenshift):
|
||||
super().__init__(config, kubecli)
|
||||
self.config = config
|
||||
|
||||
def get_config(self) -> (NetworkChaosScenarioType, BaseNetworkChaosConfig):
|
||||
return NetworkChaosScenarioType.Node, self.config
|
||||
|
||||
def get_default_interface(
|
||||
self, pod_name: str, namespace: str, kubecli: KrknTelemetryOpenshift
|
||||
) -> str:
|
||||
cmd = "ip r | grep default | awk '/default/ {print $5}'"
|
||||
output = kubecli.get_lib_kubernetes().exec_cmd_in_pod(
|
||||
[cmd], pod_name, namespace
|
||||
)
|
||||
return output.replace("\n", "")
|
||||
def get_targets(self) -> list[str]:
|
||||
if self.base_network_config.label_selector:
|
||||
return self.kubecli.get_lib_kubernetes().list_nodes(
|
||||
self.base_network_config.label_selector
|
||||
)
|
||||
else:
|
||||
if not self.config.target:
|
||||
raise Exception(
|
||||
"neither node selector nor node_name (target) specified, aborting."
|
||||
)
|
||||
node_info = self.kubecli.get_lib_kubernetes().list_nodes()
|
||||
if self.config.target not in node_info:
|
||||
raise Exception(f"node {self.config.target} not found, aborting")
|
||||
|
||||
def generate_rules(self, interfaces: list[str]) -> (list[str], list[str]):
|
||||
input_rules = []
|
||||
output_rules = []
|
||||
for interface in interfaces:
|
||||
for port in self.config.ports:
|
||||
if self.config.egress:
|
||||
output_rules.append(
|
||||
f"iptables -I OUTPUT 1 -p tcp --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
|
||||
)
|
||||
|
||||
if self.config.ingress:
|
||||
input_rules.append(
|
||||
f"iptables -I INPUT 1 -i {interface} -p tcp --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
|
||||
)
|
||||
return input_rules, output_rules
|
||||
return [self.config.target]
|
||||
|
||||
@@ -0,0 +1,177 @@
|
||||
import queue
|
||||
import time
|
||||
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
from krkn_lib.utils import get_random_string
|
||||
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import (
|
||||
NetworkChaosScenarioType,
|
||||
BaseNetworkChaosConfig,
|
||||
NetworkFilterConfig,
|
||||
)
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
|
||||
AbstractNetworkChaosModule,
|
||||
)
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_filter import (
|
||||
deploy_network_filter_pod,
|
||||
get_default_interface,
|
||||
generate_namespaced_rules,
|
||||
apply_network_rules,
|
||||
clean_network_rules_namespaced,
|
||||
)
|
||||
|
||||
|
||||
class PodNetworkFilterModule(AbstractNetworkChaosModule):
|
||||
config: NetworkFilterConfig
|
||||
|
||||
def run(self, target: str, error_queue: queue.Queue = None):
|
||||
parallel = False
|
||||
if error_queue:
|
||||
parallel = True
|
||||
try:
|
||||
pod_name = f"pod-filter-{get_random_string(5)}"
|
||||
container_name = f"fedora-container-{get_random_string(5)}"
|
||||
pod_info = self.kubecli.get_lib_kubernetes().get_pod_info(
|
||||
target, self.config.namespace
|
||||
)
|
||||
|
||||
log_info(
|
||||
f"creating workload to filter pod {self.config.target} network"
|
||||
f"ports {','.join([str(port) for port in self.config.ports])}, "
|
||||
f"ingress:{str(self.config.ingress)}, "
|
||||
f"egress:{str(self.config.egress)}",
|
||||
parallel,
|
||||
pod_name,
|
||||
)
|
||||
|
||||
if not pod_info:
|
||||
raise Exception(
|
||||
f"impossible to retrieve infos for pod {self.config.target} namespace {self.config.namespace}"
|
||||
)
|
||||
|
||||
deploy_network_filter_pod(
|
||||
self.config,
|
||||
pod_info.nodeName,
|
||||
pod_name,
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
container_name,
|
||||
)
|
||||
|
||||
if len(self.config.interfaces) == 0:
|
||||
interfaces = [
|
||||
get_default_interface(
|
||||
pod_name,
|
||||
self.config.namespace,
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
)
|
||||
]
|
||||
|
||||
log_info(
|
||||
f"detected default interface {interfaces[0]}",
|
||||
parallel,
|
||||
pod_name,
|
||||
)
|
||||
|
||||
else:
|
||||
interfaces = self.config.interfaces
|
||||
|
||||
container_ids = self.kubecli.get_lib_kubernetes().get_container_ids(
|
||||
target, self.config.namespace
|
||||
)
|
||||
|
||||
if len(container_ids) == 0:
|
||||
raise Exception(
|
||||
f"impossible to resolve container id for pod {target} namespace {self.config.namespace}"
|
||||
)
|
||||
|
||||
log_info(f"targeting container {container_ids[0]}", parallel, pod_name)
|
||||
|
||||
pids = self.kubecli.get_lib_kubernetes().get_pod_pids(
|
||||
base_pod_name=pod_name,
|
||||
base_pod_namespace=self.config.namespace,
|
||||
base_pod_container_name=container_name,
|
||||
pod_name=target,
|
||||
pod_namespace=self.config.namespace,
|
||||
pod_container_id=container_ids[0],
|
||||
)
|
||||
|
||||
if not pids:
|
||||
raise Exception(f"impossible to resolve pid for pod {target}")
|
||||
|
||||
log_info(
|
||||
f"resolved pids {pids} in node {pod_info.nodeName} for pod {target}",
|
||||
parallel,
|
||||
pod_name,
|
||||
)
|
||||
|
||||
input_rules, output_rules = generate_namespaced_rules(
|
||||
interfaces, self.config, pids
|
||||
)
|
||||
|
||||
apply_network_rules(
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
input_rules,
|
||||
output_rules,
|
||||
pod_name,
|
||||
self.config.namespace,
|
||||
parallel,
|
||||
target,
|
||||
)
|
||||
|
||||
log_info(
|
||||
f"waiting {self.config.test_duration} seconds before removing the iptables rules",
|
||||
parallel,
|
||||
pod_name,
|
||||
)
|
||||
|
||||
time.sleep(self.config.test_duration)
|
||||
|
||||
log_info("removing iptables rules", parallel, pod_name)
|
||||
|
||||
clean_network_rules_namespaced(
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
input_rules,
|
||||
output_rules,
|
||||
pod_name,
|
||||
self.config.namespace,
|
||||
pids,
|
||||
)
|
||||
|
||||
self.kubecli.get_lib_kubernetes().delete_pod(
|
||||
pod_name, self.config.namespace
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
if error_queue is None:
|
||||
raise e
|
||||
else:
|
||||
error_queue.put(str(e))
|
||||
|
||||
def __init__(self, config: NetworkFilterConfig, kubecli: KrknTelemetryOpenshift):
|
||||
super().__init__(config, kubecli)
|
||||
self.config = config
|
||||
|
||||
def get_config(self) -> (NetworkChaosScenarioType, BaseNetworkChaosConfig):
|
||||
return NetworkChaosScenarioType.Pod, self.config
|
||||
|
||||
def get_targets(self) -> list[str]:
|
||||
if not self.config.namespace:
|
||||
raise Exception("namespace not specified, aborting")
|
||||
if self.base_network_config.label_selector:
|
||||
return self.kubecli.get_lib_kubernetes().list_pods(
|
||||
self.config.namespace, self.config.label_selector
|
||||
)
|
||||
else:
|
||||
if not self.config.target:
|
||||
raise Exception(
|
||||
"neither node selector nor node_name (target) specified, aborting."
|
||||
)
|
||||
if not self.kubecli.get_lib_kubernetes().check_if_pod_exists(
|
||||
self.config.target, self.config.namespace
|
||||
):
|
||||
raise Exception(
|
||||
f"pod {self.config.target} not found in namespace {self.config.namespace}"
|
||||
)
|
||||
|
||||
return [self.config.target]
|
||||
@@ -7,11 +7,16 @@ spec:
|
||||
{% if host_network %}
|
||||
hostNetwork: true
|
||||
{%endif%}
|
||||
hostPID: true
|
||||
nodeSelector:
|
||||
kubernetes.io/hostname: {{target}}
|
||||
tolerations:
|
||||
- key: "node-role.kubernetes.io/master"
|
||||
operator: "Exists"
|
||||
effect: "NoSchedule"
|
||||
containers:
|
||||
- name: fedora
|
||||
- name: {{container_name}}
|
||||
imagePullPolicy: Always
|
||||
image: quay.io/krkn-chaos/krkn-network-chaos:latest
|
||||
image: {{workload_image}}
|
||||
securityContext:
|
||||
privileged: true
|
||||
|
||||
31
krkn/scenario_plugins/network_chaos_ng/modules/utils.py
Normal file
31
krkn/scenario_plugins/network_chaos_ng/modules/utils.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import logging
|
||||
|
||||
|
||||
def log_info(message: str, parallel: bool = False, node_name: str = ""):
|
||||
"""
|
||||
log helper method for INFO severity to be used in the scenarios
|
||||
"""
|
||||
if parallel:
|
||||
logging.info(f"[{node_name}]: {message}")
|
||||
else:
|
||||
logging.info(message)
|
||||
|
||||
|
||||
def log_error(self, message: str, parallel: bool = False, node_name: str = ""):
|
||||
"""
|
||||
log helper method for ERROR severity to be used in the scenarios
|
||||
"""
|
||||
if parallel:
|
||||
logging.error(f"[{node_name}]: {message}")
|
||||
else:
|
||||
logging.error(message)
|
||||
|
||||
|
||||
def log_warning(self, message: str, parallel: bool = False, node_name: str = ""):
|
||||
"""
|
||||
log helper method for WARNING severity to be used in the scenarios
|
||||
"""
|
||||
if parallel:
|
||||
logging.warning(f"[{node_name}]: {message}")
|
||||
else:
|
||||
logging.warning(message)
|
||||
@@ -0,0 +1,138 @@
|
||||
import os
|
||||
|
||||
import yaml
|
||||
from jinja2 import FileSystemLoader, Environment
|
||||
from krkn_lib.k8s import KrknKubernetes
|
||||
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import NetworkFilterConfig
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
|
||||
|
||||
|
||||
def generate_rules(
|
||||
interfaces: list[str], config: NetworkFilterConfig
|
||||
) -> (list[str], list[str]):
|
||||
input_rules = []
|
||||
output_rules = []
|
||||
for interface in interfaces:
|
||||
for port in config.ports:
|
||||
if config.egress:
|
||||
for protocol in set(config.protocols):
|
||||
output_rules.append(
|
||||
f"iptables -I OUTPUT 1 -p {protocol} --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
|
||||
)
|
||||
|
||||
if config.ingress:
|
||||
for protocol in set(config.protocols):
|
||||
input_rules.append(
|
||||
f"iptables -I INPUT 1 -i {interface} -p {protocol} --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
|
||||
)
|
||||
return input_rules, output_rules
|
||||
|
||||
|
||||
def generate_namespaced_rules(
|
||||
interfaces: list[str], config: NetworkFilterConfig, pids: list[str]
|
||||
) -> (list[str], list[str]):
|
||||
namespaced_input_rules: list[str] = []
|
||||
namespaced_output_rules: list[str] = []
|
||||
input_rules, output_rules = generate_rules(interfaces, config)
|
||||
for pid in pids:
|
||||
ns_input_rules = [
|
||||
f"nsenter --target {pid} --net -- {rule}" for rule in input_rules
|
||||
]
|
||||
ns_output_rules = [
|
||||
f"nsenter --target {pid} --net -- {rule}" for rule in output_rules
|
||||
]
|
||||
namespaced_input_rules.extend(ns_input_rules)
|
||||
namespaced_output_rules.extend(ns_output_rules)
|
||||
|
||||
return namespaced_input_rules, namespaced_output_rules
|
||||
|
||||
|
||||
def deploy_network_filter_pod(
|
||||
config: NetworkFilterConfig,
|
||||
target_node: str,
|
||||
pod_name: str,
|
||||
kubecli: KrknKubernetes,
|
||||
container_name: str = "fedora",
|
||||
):
|
||||
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
|
||||
env = Environment(loader=file_loader, autoescape=True)
|
||||
pod_template = env.get_template("templates/network-chaos.j2")
|
||||
pod_body = yaml.safe_load(
|
||||
pod_template.render(
|
||||
pod_name=pod_name,
|
||||
namespace=config.namespace,
|
||||
host_network=True,
|
||||
target=target_node,
|
||||
container_name=container_name,
|
||||
workload_image=config.image,
|
||||
)
|
||||
)
|
||||
|
||||
kubecli.create_pod(pod_body, config.namespace, 300)
|
||||
|
||||
|
||||
def apply_network_rules(
|
||||
kubecli: KrknKubernetes,
|
||||
input_rules: list[str],
|
||||
output_rules: list[str],
|
||||
pod_name: str,
|
||||
namespace: str,
|
||||
parallel: bool,
|
||||
node_name: str,
|
||||
):
|
||||
for rule in input_rules:
|
||||
log_info(f"applying iptables INPUT rule: {rule}", parallel, node_name)
|
||||
kubecli.exec_cmd_in_pod([rule], pod_name, namespace)
|
||||
for rule in output_rules:
|
||||
log_info(f"applying iptables OUTPUT rule: {rule}", parallel, node_name)
|
||||
kubecli.exec_cmd_in_pod([rule], pod_name, namespace)
|
||||
|
||||
|
||||
def clean_network_rules(
|
||||
kubecli: KrknKubernetes,
|
||||
input_rules: list[str],
|
||||
output_rules: list[str],
|
||||
pod_name: str,
|
||||
namespace: str,
|
||||
):
|
||||
for _ in input_rules:
|
||||
# always deleting the first rule since has been inserted from the top
|
||||
kubecli.exec_cmd_in_pod([f"iptables -D INPUT 1"], pod_name, namespace)
|
||||
for _ in output_rules:
|
||||
# always deleting the first rule since has been inserted from the top
|
||||
kubecli.exec_cmd_in_pod([f"iptables -D OUTPUT 1"], pod_name, namespace)
|
||||
|
||||
|
||||
def clean_network_rules_namespaced(
|
||||
kubecli: KrknKubernetes,
|
||||
input_rules: list[str],
|
||||
output_rules: list[str],
|
||||
pod_name: str,
|
||||
namespace: str,
|
||||
pids: list[str],
|
||||
):
|
||||
for _ in input_rules:
|
||||
for pid in pids:
|
||||
# always deleting the first rule since has been inserted from the top
|
||||
kubecli.exec_cmd_in_pod(
|
||||
[f"nsenter --target {pid} --net -- iptables -D INPUT 1"],
|
||||
pod_name,
|
||||
namespace,
|
||||
)
|
||||
for _ in output_rules:
|
||||
for pid in pids:
|
||||
# always deleting the first rule since has been inserted from the top
|
||||
kubecli.exec_cmd_in_pod(
|
||||
[f"nsenter --target {pid} --net -- iptables -D OUTPUT 1"],
|
||||
pod_name,
|
||||
namespace,
|
||||
)
|
||||
|
||||
|
||||
def get_default_interface(
|
||||
pod_name: str, namespace: str, kubecli: KrknKubernetes
|
||||
) -> str:
|
||||
cmd = "ip r | grep default | awk '/default/ {print $5}'"
|
||||
output = kubecli.exec_cmd_in_pod([cmd], pod_name, namespace)
|
||||
return output.replace("\n", "")
|
||||
@@ -1,14 +1,25 @@
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import NetworkFilterConfig
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import AbstractNetworkChaosModule
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.node_network_filter import NodeNetworkFilterModule
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
|
||||
AbstractNetworkChaosModule,
|
||||
)
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.node_network_filter import (
|
||||
NodeNetworkFilterModule,
|
||||
)
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.pod_network_filter import (
|
||||
PodNetworkFilterModule,
|
||||
)
|
||||
|
||||
supported_modules = ["node_network_filter", "pod_network_filter"]
|
||||
|
||||
supported_modules = ["node_network_filter"]
|
||||
|
||||
class NetworkChaosFactory:
|
||||
|
||||
@staticmethod
|
||||
def get_instance(config: dict[str, str]) -> AbstractNetworkChaosModule:
|
||||
def get_instance(
|
||||
config: dict[str, str], kubecli: KrknTelemetryOpenshift
|
||||
) -> AbstractNetworkChaosModule:
|
||||
if config["id"] is None:
|
||||
raise Exception("network chaos id cannot be None")
|
||||
if config["id"] not in supported_modules:
|
||||
@@ -19,6 +30,10 @@ class NetworkChaosFactory:
|
||||
errors = config.validate()
|
||||
if len(errors) > 0:
|
||||
raise Exception(f"config validation errors: [{';'.join(errors)}]")
|
||||
return NodeNetworkFilterModule(config)
|
||||
|
||||
|
||||
return NodeNetworkFilterModule(config, kubecli)
|
||||
if config["id"] == "pod_network_filter":
|
||||
config = NetworkFilterConfig(**config)
|
||||
errors = config.validate()
|
||||
if len(errors) > 0:
|
||||
raise Exception(f"config validation errors: [{';'.join(errors)}]")
|
||||
return PodNetworkFilterModule(config, kubecli)
|
||||
|
||||
@@ -9,10 +9,6 @@ from krkn_lib.models.telemetry import ScenarioTelemetry
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
|
||||
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import (
|
||||
NetworkChaosScenarioType,
|
||||
BaseNetworkChaosConfig,
|
||||
)
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
|
||||
AbstractNetworkChaosModule,
|
||||
)
|
||||
@@ -39,56 +35,52 @@ class NetworkChaosNgScenarioPlugin(AbstractScenarioPlugin):
|
||||
)
|
||||
return 1
|
||||
for config in scenario_config:
|
||||
network_chaos = NetworkChaosFactory.get_instance(config)
|
||||
network_chaos_config = network_chaos.get_config()
|
||||
logging.info(
|
||||
f"running network_chaos scenario: {network_chaos_config[1].id}"
|
||||
network_chaos = NetworkChaosFactory.get_instance(
|
||||
config, lib_telemetry
|
||||
)
|
||||
if network_chaos_config[0] == NetworkChaosScenarioType.Node:
|
||||
targets = lib_telemetry.get_lib_kubernetes().list_nodes(
|
||||
network_chaos_config[1].label_selector
|
||||
)
|
||||
else:
|
||||
targets = lib_telemetry.get_lib_kubernetes().list_pods(
|
||||
network_chaos_config[1].namespace,
|
||||
network_chaos_config[1].label_selector,
|
||||
)
|
||||
network_chaos_type, network_chaos_config = (
|
||||
network_chaos.get_config()
|
||||
)
|
||||
logging.info(
|
||||
f"running network_chaos scenario: {network_chaos_config.id}"
|
||||
)
|
||||
targets = network_chaos.get_targets()
|
||||
if len(targets) == 0:
|
||||
logging.warning(
|
||||
f"no targets found for {network_chaos_config[1].id} "
|
||||
f"network chaos scenario with selector {network_chaos_config[1].label_selector} "
|
||||
f"with target type {network_chaos_config[0]}"
|
||||
f"no targets found for {network_chaos_config.id} "
|
||||
f"network chaos scenario with selector {network_chaos_config.label_selector} "
|
||||
f"with target type {network_chaos_type}"
|
||||
)
|
||||
|
||||
if network_chaos_config[1].instance_count != 0 and network_chaos_config[1].instance_count > len(targets):
|
||||
targets = random.sample(targets, network_chaos_config[1].instance_count)
|
||||
if (
|
||||
network_chaos_config.instance_count != 0
|
||||
and network_chaos_config.instance_count > len(targets)
|
||||
):
|
||||
targets = random.sample(
|
||||
targets, network_chaos_config.instance_count
|
||||
)
|
||||
|
||||
if network_chaos_config[1].execution == "parallel":
|
||||
self.run_parallel(targets, network_chaos, lib_telemetry)
|
||||
if network_chaos_config.execution == "parallel":
|
||||
self.run_parallel(targets, network_chaos)
|
||||
else:
|
||||
self.run_serial(targets, network_chaos, lib_telemetry)
|
||||
self.run_serial(targets, network_chaos)
|
||||
if len(config) > 1:
|
||||
logging.info(f"waiting {network_chaos_config[1].wait_duration} seconds before running the next "
|
||||
f"Network Chaos NG Module")
|
||||
time.sleep(network_chaos_config[1].wait_duration)
|
||||
logging.info(
|
||||
f"waiting {network_chaos_config.wait_duration} seconds before running the next "
|
||||
f"Network Chaos NG Module"
|
||||
)
|
||||
time.sleep(network_chaos_config.wait_duration)
|
||||
except Exception as e:
|
||||
logging.error(str(e))
|
||||
return 1
|
||||
return 0
|
||||
|
||||
def run_parallel(
|
||||
self,
|
||||
targets: list[str],
|
||||
module: AbstractNetworkChaosModule,
|
||||
lib_telemetry: KrknTelemetryOpenshift,
|
||||
):
|
||||
def run_parallel(self, targets: list[str], module: AbstractNetworkChaosModule):
|
||||
error_queue = queue.Queue()
|
||||
threads = []
|
||||
errors = []
|
||||
for target in targets:
|
||||
thread = threading.Thread(
|
||||
target=module.run, args=[target, lib_telemetry, error_queue]
|
||||
)
|
||||
thread = threading.Thread(target=module.run, args=[target, error_queue])
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
for thread in threads:
|
||||
@@ -103,14 +95,9 @@ class NetworkChaosNgScenarioPlugin(AbstractScenarioPlugin):
|
||||
f"module {module.get_config()[1].id} execution failed: [{';'.join(errors)}]"
|
||||
)
|
||||
|
||||
def run_serial(
|
||||
self,
|
||||
targets: list[str],
|
||||
module: AbstractNetworkChaosModule,
|
||||
lib_telemetry: KrknTelemetryOpenshift,
|
||||
):
|
||||
def run_serial(self, targets: list[str], module: AbstractNetworkChaosModule):
|
||||
for target in targets:
|
||||
module.run(target, lib_telemetry)
|
||||
module.run(target)
|
||||
|
||||
def get_scenario_types(self) -> list[str]:
|
||||
return ["network_chaos_ng_scenarios"]
|
||||
|
||||
@@ -10,6 +10,7 @@ import time
|
||||
import traceback
|
||||
from krkn_lib.k8s import KrknKubernetes
|
||||
from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus
|
||||
from krkn_lib.utils import get_random_string
|
||||
|
||||
class BM:
|
||||
def __init__(self, bm_info, user, passwd):
|
||||
@@ -21,6 +22,17 @@ class BM:
|
||||
with oc.project("openshift-machine-api"):
|
||||
return oc.selector("node/" + node_name).object()
|
||||
|
||||
def get_bm_disks(self, node_name):
|
||||
if (
|
||||
self.bm_info is not None
|
||||
and node_name in self.bm_info
|
||||
and "disks" in self.bm_info[node_name]
|
||||
):
|
||||
return self.bm_info[node_name]["disks"]
|
||||
else:
|
||||
return []
|
||||
|
||||
|
||||
# Get the ipmi or other BMC address of the baremetal node
|
||||
def get_bmc_addr(self, node_name):
|
||||
# Addresses in the config get higher priority.
|
||||
@@ -228,3 +240,104 @@ class bm_node_scenarios(abstract_node_scenarios):
|
||||
logging.error("node_reboot_scenario injection failed!")
|
||||
raise e
|
||||
self.affected_nodes_status.affected_nodes.append(affected_node)
|
||||
|
||||
def node_disk_detach_attach_scenario(self, instance_kill_count, node, timeout, duration):
|
||||
logging.info("Starting disk_detach_attach_scenario injection")
|
||||
disk_attachment_details = self.get_disk_attachment_info(instance_kill_count, node)
|
||||
if disk_attachment_details:
|
||||
self.disk_detach_scenario(instance_kill_count, node, disk_attachment_details, timeout)
|
||||
logging.info("Waiting for %s seconds before attaching the disk" % (duration))
|
||||
time.sleep(duration)
|
||||
self.disk_attach_scenario(instance_kill_count, node, disk_attachment_details)
|
||||
logging.info("node_disk_detach_attach_scenario has been successfully injected!")
|
||||
else:
|
||||
logging.error("Node %s has only root disk attached" % (node))
|
||||
logging.error("node_disk_detach_attach_scenario failed!")
|
||||
|
||||
|
||||
# Get volume attachment info
|
||||
def get_disk_attachment_info(self, instance_kill_count, node):
|
||||
for _ in range(instance_kill_count):
|
||||
try:
|
||||
logging.info("Obtaining disk attachment information")
|
||||
user_disks= self.bm.get_bm_disks(node)
|
||||
disk_pod_name = f"disk-pod-{get_random_string(5)}"
|
||||
cmd = '''bootdev=$(lsblk -no PKNAME $(findmnt -no SOURCE /boot));
|
||||
for path in /sys/block/*/device/state; do
|
||||
dev=$(basename $(dirname $(dirname "$path")));
|
||||
[[ "$dev" != "$bootdev" ]] && echo "$dev";
|
||||
done'''
|
||||
pod_command = ["chroot /host /bin/sh -c '" + cmd + "'"]
|
||||
disk_response = self.kubecli.exec_command_on_node(
|
||||
node, pod_command, disk_pod_name, "default"
|
||||
)
|
||||
logging.info("Disk response: %s" % (disk_response))
|
||||
node_disks = [disk for disk in disk_response.split("\n") if disk]
|
||||
logging.info("Node disks: %s" % (node_disks))
|
||||
offline_disks = [disk for disk in user_disks if disk in node_disks]
|
||||
return offline_disks if offline_disks else node_disks
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
"Failed to obtain disk attachment information of %s node. "
|
||||
"Encounteres following exception: %s." % (node, e)
|
||||
)
|
||||
raise RuntimeError()
|
||||
finally:
|
||||
self.kubecli.delete_pod(disk_pod_name, "default")
|
||||
|
||||
# Node scenario to detach the volume
|
||||
def disk_detach_scenario(self, instance_kill_count, node, disk_attachment_details, timeout):
|
||||
for _ in range(instance_kill_count):
|
||||
try:
|
||||
logging.info("Starting disk_detach_scenario injection")
|
||||
logging.info(
|
||||
"Detaching the %s disks from instance %s "
|
||||
% (disk_attachment_details, node)
|
||||
)
|
||||
disk_pod_name = f"detach-disk-pod-{get_random_string(5)}"
|
||||
detach_disk_command=''
|
||||
for disk in disk_attachment_details:
|
||||
detach_disk_command = detach_disk_command + "echo offline > /sys/block/" + disk + "/device/state;"
|
||||
pod_command = ["chroot /host /bin/sh -c '" + detach_disk_command + "'"]
|
||||
cmd_output = self.kubecli.exec_command_on_node(
|
||||
node, pod_command, disk_pod_name, "default"
|
||||
)
|
||||
logging.info("Disk command output: %s" % (cmd_output))
|
||||
logging.info("Disk %s has been detached from %s node" % (disk_attachment_details, node))
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
"Failed to detach disk from %s node. Encountered following"
|
||||
"exception: %s." % (node, e)
|
||||
)
|
||||
logging.debug("")
|
||||
raise RuntimeError()
|
||||
finally:
|
||||
self.kubecli.delete_pod(disk_pod_name, "default")
|
||||
|
||||
# Node scenario to attach the volume
|
||||
def disk_attach_scenario(self, instance_kill_count, node, disk_attachment_details):
|
||||
for _ in range(instance_kill_count):
|
||||
try:
|
||||
logging.info(
|
||||
"Attaching the %s disks from instance %s "
|
||||
% (disk_attachment_details, node)
|
||||
)
|
||||
disk_pod_name = f"attach-disk-pod-{get_random_string(5)}"
|
||||
attach_disk_command=''
|
||||
for disk in disk_attachment_details:
|
||||
attach_disk_command = attach_disk_command + "echo running > /sys/block/" + disk + "/device/state;"
|
||||
pod_command = ["chroot /host /bin/sh -c '" + attach_disk_command + "'"]
|
||||
cmd_output = self.kubecli.exec_command_on_node(
|
||||
node, pod_command, disk_pod_name, "default"
|
||||
)
|
||||
logging.info("Disk command output: %s" % (cmd_output))
|
||||
logging.info("Disk %s has been attached to %s node" % (disk_attachment_details, node))
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
"Failed to attach disk to %s node. Encountered following"
|
||||
"exception: %s." % (node, e)
|
||||
)
|
||||
logging.debug("")
|
||||
raise RuntimeError()
|
||||
finally:
|
||||
self.kubecli.delete_pod(disk_pod_name, "default")
|
||||
|
||||
@@ -1,16 +1,10 @@
|
||||
import datetime
|
||||
import time
|
||||
import random
|
||||
import logging
|
||||
import paramiko
|
||||
from krkn_lib.models.k8s import AffectedNode
|
||||
import krkn.invoke.command as runcommand
|
||||
from krkn_lib.k8s import KrknKubernetes
|
||||
from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus
|
||||
from krkn_lib.models.k8s import AffectedNode
|
||||
|
||||
node_general = False
|
||||
|
||||
|
||||
def get_node_by_name(node_name_list, kubecli: KrknKubernetes):
|
||||
killable_nodes = kubecli.list_killable_nodes()
|
||||
@@ -65,14 +59,6 @@ def wait_for_unknown_status(node, timeout, kubecli: KrknKubernetes, affected_nod
|
||||
return affected_node
|
||||
|
||||
|
||||
# Get the ip of the cluster node
|
||||
def get_node_ip(node):
|
||||
return runcommand.invoke(
|
||||
"kubectl get node %s -o "
|
||||
"jsonpath='{.status.addresses[?(@.type==\"InternalIP\")].address}'" % (node)
|
||||
)
|
||||
|
||||
|
||||
def check_service_status(node, service, ssh_private_key, timeout):
|
||||
ssh = paramiko.SSHClient()
|
||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
|
||||
0
krkn/scenario_plugins/pod_disruption/__init__.py
Normal file
0
krkn/scenario_plugins/pod_disruption/__init__.py
Normal file
21
krkn/scenario_plugins/pod_disruption/models/models.py
Normal file
21
krkn/scenario_plugins/pod_disruption/models/models.py
Normal file
@@ -0,0 +1,21 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class InputParams:
|
||||
def __init__(self, config: dict[str,any] = None):
|
||||
if config:
|
||||
self.kill = config["kill"] if "kill" in config else 1
|
||||
self.timeout = config["timeout"] if "timeout" in config else 120
|
||||
self.duration = config["duration"] if "duration" in config else 10
|
||||
self.krkn_pod_recovery_time = config["krkn_pod_recovery_time"] if "krkn_pod_recovery_time" in config else 120
|
||||
self.label_selector = config["label_selector"] if "label_selector" in config else ""
|
||||
self.namespace_pattern = config["namespace_pattern"] if "namespace_pattern" in config else ""
|
||||
self.name_pattern = config["name_pattern"] if "name_pattern" in config else ""
|
||||
|
||||
namespace_pattern: str
|
||||
krkn_pod_recovery_time: int
|
||||
timeout: int
|
||||
duration: int
|
||||
kill: int
|
||||
label_selector: str
|
||||
name_pattern: str
|
||||
@@ -0,0 +1,164 @@
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
|
||||
import yaml
|
||||
from krkn_lib.k8s import KrknKubernetes
|
||||
from krkn_lib.k8s.pods_monitor_pool import PodsMonitorPool
|
||||
from krkn.scenario_plugins.pod_disruption.models.models import InputParams
|
||||
from krkn_lib.models.telemetry import ScenarioTelemetry
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
from krkn_lib.utils import get_yaml_item_value
|
||||
from datetime import datetime
|
||||
from dataclasses import dataclass
|
||||
|
||||
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
|
||||
|
||||
@dataclass
|
||||
class Pod:
|
||||
namespace: str
|
||||
name: str
|
||||
creation_timestamp : str
|
||||
|
||||
class PodDisruptionScenarioPlugin(AbstractScenarioPlugin):
|
||||
def run(
|
||||
self,
|
||||
run_uuid: str,
|
||||
scenario: str,
|
||||
krkn_config: dict[str, any],
|
||||
lib_telemetry: KrknTelemetryOpenshift,
|
||||
scenario_telemetry: ScenarioTelemetry,
|
||||
) -> int:
|
||||
pool = PodsMonitorPool(lib_telemetry.get_lib_kubernetes())
|
||||
try:
|
||||
with open(scenario, "r") as f:
|
||||
cont_scenario_config = yaml.full_load(f)
|
||||
for kill_scenario in cont_scenario_config:
|
||||
kill_scenario_config = InputParams(kill_scenario["config"])
|
||||
self.start_monitoring(
|
||||
kill_scenario_config, pool
|
||||
)
|
||||
return_status = self.killing_pods(
|
||||
kill_scenario_config, lib_telemetry.get_lib_kubernetes()
|
||||
)
|
||||
if return_status != 0:
|
||||
result = pool.cancel()
|
||||
else:
|
||||
result = pool.join()
|
||||
if result.error:
|
||||
logging.error(
|
||||
logging.error(
|
||||
f"PodDisruptionScenariosPlugin pods failed to recovery: {result.error}"
|
||||
)
|
||||
)
|
||||
return 1
|
||||
|
||||
scenario_telemetry.affected_pods = result
|
||||
|
||||
except (RuntimeError, Exception) as e:
|
||||
logging.error("PodDisruptionScenariosPlugin exiting due to Exception %s" % e)
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
def get_scenario_types(self) -> list[str]:
|
||||
return ["pod_disruption_scenarios"]
|
||||
|
||||
def start_monitoring(self, kill_scenario: InputParams, pool: PodsMonitorPool):
|
||||
|
||||
recovery_time = kill_scenario.krkn_pod_recovery_time
|
||||
if (
|
||||
kill_scenario.namespace_pattern
|
||||
and kill_scenario.label_selector
|
||||
):
|
||||
namespace_pattern = kill_scenario.namespace_pattern
|
||||
label_selector = kill_scenario.label_selector
|
||||
pool.select_and_monitor_by_namespace_pattern_and_label(
|
||||
namespace_pattern=namespace_pattern,
|
||||
label_selector=label_selector,
|
||||
max_timeout=recovery_time,
|
||||
field_selector="status.phase=Running"
|
||||
)
|
||||
logging.info(
|
||||
f"waiting up to {recovery_time} seconds for pod recovery, "
|
||||
f"pod label pattern: {label_selector} namespace pattern: {namespace_pattern}"
|
||||
)
|
||||
|
||||
elif (
|
||||
kill_scenario.namespace_pattern
|
||||
and kill_scenario.name_pattern
|
||||
):
|
||||
namespace_pattern = kill_scenario.namespace_pattern
|
||||
name_pattern = kill_scenario.name_pattern
|
||||
pool.select_and_monitor_by_name_pattern_and_namespace_pattern(
|
||||
pod_name_pattern=name_pattern,
|
||||
namespace_pattern=namespace_pattern,
|
||||
max_timeout=recovery_time,
|
||||
field_selector="status.phase=Running"
|
||||
)
|
||||
logging.info(
|
||||
f"waiting up to {recovery_time} seconds for pod recovery, "
|
||||
f"pod name pattern: {name_pattern} namespace pattern: {namespace_pattern}"
|
||||
)
|
||||
else:
|
||||
raise Exception(
|
||||
f"impossible to determine monitor parameters, check {kill_scenario} configuration"
|
||||
)
|
||||
|
||||
|
||||
def get_pods(self, name_pattern, label_selector,namespace, kubecli: KrknKubernetes, field_selector: str =None):
|
||||
if label_selector and name_pattern:
|
||||
logging.error('Only, one of name pattern or label pattern can be specified')
|
||||
elif label_selector:
|
||||
pods = kubecli.select_pods_by_namespace_pattern_and_label(label_selector=label_selector,namespace_pattern=namespace, field_selector=field_selector)
|
||||
elif name_pattern:
|
||||
pods = kubecli.select_pods_by_name_pattern_and_namespace_pattern(pod_name_pattern=name_pattern, namespace_pattern=namespace, field_selector=field_selector)
|
||||
else:
|
||||
logging.error('Name pattern or label pattern must be specified ')
|
||||
return pods
|
||||
|
||||
def killing_pods(self, config: InputParams, kubecli: KrknKubernetes):
|
||||
# region Select target pods
|
||||
|
||||
namespace = config.namespace_pattern
|
||||
if not namespace:
|
||||
logging.error('Namespace pattern must be specified')
|
||||
|
||||
pods = self.get_pods(config.name_pattern,config.label_selector,config.namespace_pattern, kubecli, field_selector="status.phase=Running")
|
||||
pods_count = len(pods)
|
||||
if len(pods) < config.kill:
|
||||
logging.error("Not enough pods match the criteria, expected {} but found only {} pods".format(
|
||||
config.kill, len(pods)))
|
||||
return 1
|
||||
|
||||
random.shuffle(pods)
|
||||
for i in range(config.kill):
|
||||
|
||||
pod = pods[i]
|
||||
logging.info(pod)
|
||||
logging.info(f'Deleting pod {pod[0]}')
|
||||
kubecli.delete_pod(pod[0], pod[1])
|
||||
|
||||
self.wait_for_pods(config.label_selector,config.name_pattern,config.namespace_pattern, pods_count, config.duration, config.timeout, kubecli)
|
||||
return 0
|
||||
|
||||
def wait_for_pods(
|
||||
self, label_selector, pod_name, namespace, pod_count, duration, wait_timeout, kubecli: KrknKubernetes
|
||||
):
|
||||
timeout = False
|
||||
start_time = datetime.now()
|
||||
|
||||
while not timeout:
|
||||
pods = self.get_pods(name_pattern=pod_name, label_selector=label_selector,namespace=namespace, field_selector="status.phase=Running", kubecli=kubecli)
|
||||
if pod_count == len(pods):
|
||||
return
|
||||
|
||||
time.sleep(duration)
|
||||
|
||||
now_time = datetime.now()
|
||||
|
||||
time_diff = now_time - start_time
|
||||
if time_diff.seconds > wait_timeout:
|
||||
logging.error("timeout while waiting for pods to come up")
|
||||
return 1
|
||||
return 0
|
||||
@@ -32,6 +32,7 @@ class ZoneOutageScenarioPlugin(AbstractScenarioPlugin):
|
||||
zone_outage_config_yaml = yaml.full_load(f)
|
||||
scenario_config = zone_outage_config_yaml["zone_outage"]
|
||||
cloud_type = scenario_config["cloud_type"]
|
||||
kube_check = get_yaml_item_value(scenario_config, "kube_check", True)
|
||||
start_time = int(time.time())
|
||||
if cloud_type.lower() == "aws":
|
||||
self.cloud_object = AWS()
|
||||
@@ -40,7 +41,7 @@ class ZoneOutageScenarioPlugin(AbstractScenarioPlugin):
|
||||
kubecli = lib_telemetry.get_lib_kubernetes()
|
||||
if cloud_type.lower() == "gcp":
|
||||
affected_nodes_status = AffectedNodeStatus()
|
||||
self.cloud_object = gcp_node_scenarios(kubecli, affected_nodes_status)
|
||||
self.cloud_object = gcp_node_scenarios(kubecli, kube_check, affected_nodes_status)
|
||||
self.node_based_zone(scenario_config, kubecli)
|
||||
affected_nodes_status = self.cloud_object.affected_nodes_status
|
||||
scenario_telemetry.affected_nodes.extend(affected_nodes_status.affected_nodes)
|
||||
|
||||
@@ -16,7 +16,7 @@ google-cloud-compute==1.22.0
|
||||
ibm_cloud_sdk_core==3.18.0
|
||||
ibm_vpc==0.20.0
|
||||
jinja2==3.1.6
|
||||
krkn-lib==5.0.2
|
||||
krkn-lib==5.1.0
|
||||
lxml==5.1.0
|
||||
kubernetes==28.1.0
|
||||
numpy==1.26.4
|
||||
@@ -28,7 +28,7 @@ pyfiglet==1.0.2
|
||||
pytest==8.0.0
|
||||
python-ipmi==0.5.4
|
||||
python-openstackclient==6.5.0
|
||||
requests==2.32.2
|
||||
requests==2.32.4
|
||||
service_identity==24.1.0
|
||||
PyYAML==6.0.1
|
||||
setuptools==78.1.1
|
||||
@@ -36,7 +36,5 @@ werkzeug==3.0.6
|
||||
wheel==0.42.0
|
||||
zope.interface==5.4.0
|
||||
|
||||
|
||||
git+https://github.com/krkn-chaos/arcaflow-plugin-kill-pod.git@v0.1.0
|
||||
git+https://github.com/vmware/vsphere-automation-sdk-python.git@v8.0.0.0
|
||||
cryptography>=42.0.4 # not directly required, pinned by Snyk to avoid a vulnerability
|
||||
|
||||
5
scenarios/kind/pod_etcd.yml
Executable file
5
scenarios/kind/pod_etcd.yml
Executable file
@@ -0,0 +1,5 @@
|
||||
- id: kill-pods
|
||||
config:
|
||||
namespace_pattern: "kube-system"
|
||||
label_selector: "component=etcd"
|
||||
krkn_pod_recovery_time: 120
|
||||
@@ -1,13 +0,0 @@
|
||||
- id: node_network_filter
|
||||
wait_duration: 300
|
||||
test_duration: 100
|
||||
label_selector: "kubernetes.io/hostname=ip-10-0-39-182.us-east-2.compute.internal"
|
||||
namespace: 'default'
|
||||
instance_count: 1
|
||||
execution: parallel
|
||||
ingress: false
|
||||
egress: true
|
||||
target: node
|
||||
interfaces: []
|
||||
ports:
|
||||
- 2049
|
||||
16
scenarios/kube/node-network-filter.yml
Normal file
16
scenarios/kube/node-network-filter.yml
Normal file
@@ -0,0 +1,16 @@
|
||||
- id: node_network_filter
|
||||
image: "quay.io/krkn-chaos/krkn-network-chaos:latest"
|
||||
wait_duration: 1
|
||||
test_duration: 10
|
||||
label_selector: "<node_selector>"
|
||||
namespace: 'default'
|
||||
instance_count: 1
|
||||
execution: parallel
|
||||
ingress: false
|
||||
egress: true
|
||||
target: '<node_name>'
|
||||
interfaces: []
|
||||
ports:
|
||||
- 2309
|
||||
protocols:
|
||||
- tcp
|
||||
17
scenarios/kube/pod-network-filter.yml
Normal file
17
scenarios/kube/pod-network-filter.yml
Normal file
@@ -0,0 +1,17 @@
|
||||
- id: pod_network_filter
|
||||
image: "quay.io/krkn-chaos/krkn-network-chaos:latest"
|
||||
wait_duration: 1
|
||||
test_duration: 60
|
||||
label_selector: "<pod_selector>"
|
||||
namespace: 'default'
|
||||
instance_count: 1
|
||||
execution: parallel
|
||||
ingress: false
|
||||
egress: true
|
||||
target: "<pod_name>"
|
||||
interfaces: []
|
||||
protocols:
|
||||
- tcp
|
||||
- udp
|
||||
ports:
|
||||
- 53
|
||||
7
scenarios/kubevirt/kubevirt-vm-outage.yaml
Normal file
7
scenarios/kubevirt/kubevirt-vm-outage.yaml
Normal file
@@ -0,0 +1,7 @@
|
||||
scenarios:
|
||||
- name: "kubevirt outage test"
|
||||
scenario: kubevirt_vm_outage
|
||||
parameters:
|
||||
vm_name: <vm-name>
|
||||
namespace: <namespace>
|
||||
timeout: 60
|
||||
@@ -1,19 +1,33 @@
|
||||
node_scenarios:
|
||||
- actions: # Node chaos scenarios to be injected.
|
||||
- node_stop_start_scenario
|
||||
node_name: # Node on which scenario has to be injected.
|
||||
- actions: # Node chaos scenarios to be injected
|
||||
- node_stop_start_scenario # Action to run. Supported actions: node_stop, node_restart, node_stop_start etc. Please refer documentation
|
||||
node_name: # Node(s) on which scenario has to be injected separated by a comma
|
||||
label_selector: node-role.kubernetes.io/worker # When node_name is not specified, a node with matching label_selector is selected for node chaos scenario injection.
|
||||
instance_count: 1 # Number of nodes to perform action/select that match the label selector.
|
||||
instance_count: 1 # Number of nodes to perform action/select that match the label selector
|
||||
runs: 1 # Number of times to inject each scenario under actions (will perform on same node each time).
|
||||
timeout: 360 # Duration to wait for completion of node scenario injection.
|
||||
duration: 120 # Duration to stop the node before running the start action
|
||||
parallel: False # Run action on label or node name in parallel or sequential, set to true for parallel
|
||||
cloud_type: bm # Cloud type on which Kubernetes/OpenShift runs.
|
||||
bmc_user: defaultuser # For baremetal (bm) cloud type. The default IPMI username. Optional if specified for all machines.
|
||||
bmc_password: defaultpass # For baremetal (bm) cloud type. The default IPMI password. Optional if specified for all machines.
|
||||
kube_check: True # Run the kubernetes api calls to see if the node gets to a certain state during the node scenario
|
||||
bmc_user: defaultuser # For baremetal (bm) cloud type. The default IPMI username. Optional if specified for all machines
|
||||
bmc_password: defaultpass # For baremetal (bm) cloud type. The default IPMI password. Optional if specified for all machines
|
||||
bmc_info: # This section is here to specify baremetal per-machine info, so it is optional if there is no per-machine info.
|
||||
node-1: # The node name for the baremetal machine
|
||||
bmc_addr: mgmt-machine1.example.com # Optional. For baremetal nodes with the IPMI BMC address missing from 'oc get bmh'.
|
||||
bmc_addr: mgmt-machine1.example.com # Optional. For baremetal nodes with the IPMI BMC address missing from 'oc get bmh'
|
||||
node-2:
|
||||
bmc_addr: mgmt-machine2.example.com
|
||||
bmc_user: user # The baremetal IPMI user. Overrides the default IPMI user specified above. Optional if the default is set.
|
||||
bmc_user: user # The baremetal IPMI user. Overrides the default IPMI user specified above. Optional if the default is set
|
||||
bmc_password: pass # The baremetal IPMI password. Overrides the default IPMI user specified above. Optional if the default is set
|
||||
- actions:
|
||||
- node_disk_detach_attach_scenario
|
||||
node_name: node-1
|
||||
instance_count: 1
|
||||
runs: 1
|
||||
timeout: 360
|
||||
duration: 120
|
||||
parallel: False
|
||||
cloud_type: bm
|
||||
bmc_info:
|
||||
node-1:
|
||||
disks: ["sda", "sdb"] # List of disk devices to be used for disk detach/attach scenarios
|
||||
|
||||
215
tests/kubevirt_vm_outage/test_kubevirt_vm_outage.py
Normal file
215
tests/kubevirt_vm_outage/test_kubevirt_vm_outage.py
Normal file
@@ -0,0 +1,215 @@
|
||||
import unittest
|
||||
import time
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import yaml
|
||||
from krkn_lib.k8s import KrknKubernetes
|
||||
from krkn_lib.models.telemetry import ScenarioTelemetry
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
|
||||
from krkn.scenario_plugins.kubevirt_vm_outage.kubevirt_vm_outage_scenario_plugin import KubevirtVmOutageScenarioPlugin
|
||||
|
||||
|
||||
class TestKubevirtVmOutageScenarioPlugin(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
"""
|
||||
Set up test fixtures for KubevirtVmOutageScenarioPlugin
|
||||
"""
|
||||
self.plugin = KubevirtVmOutageScenarioPlugin()
|
||||
|
||||
# Create mock k8s client
|
||||
self.k8s_client = MagicMock()
|
||||
self.custom_object_client = MagicMock()
|
||||
self.k8s_client.custom_object_client = self.custom_object_client
|
||||
self.plugin.k8s_client = self.k8s_client
|
||||
|
||||
# Mock methods needed for KubeVirt operations
|
||||
self.k8s_client.list_custom_resource_definition = MagicMock()
|
||||
|
||||
# Mock custom resource definition list with KubeVirt CRDs
|
||||
crd_list = MagicMock()
|
||||
crd_item = MagicMock()
|
||||
crd_item.spec = MagicMock()
|
||||
crd_item.spec.group = "kubevirt.io"
|
||||
crd_list.items = [crd_item]
|
||||
self.k8s_client.list_custom_resource_definition.return_value = crd_list
|
||||
|
||||
# Mock VMI data
|
||||
self.mock_vmi = {
|
||||
"metadata": {
|
||||
"name": "test-vm",
|
||||
"namespace": "default"
|
||||
},
|
||||
"status": {
|
||||
"phase": "Running"
|
||||
}
|
||||
}
|
||||
|
||||
# Create test config
|
||||
self.config = {
|
||||
"scenarios": [
|
||||
{
|
||||
"name": "kubevirt outage test",
|
||||
"scenario": "kubevirt_vm_outage",
|
||||
"parameters": {
|
||||
"vm_name": "test-vm",
|
||||
"namespace": "default",
|
||||
"duration": 0
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
# Create a temporary config file
|
||||
import tempfile, os
|
||||
temp_dir = tempfile.gettempdir()
|
||||
self.scenario_file = os.path.join(temp_dir, "test_kubevirt_scenario.yaml")
|
||||
with open(self.scenario_file, "w") as f:
|
||||
yaml.dump(self.config, f)
|
||||
|
||||
# Mock dependencies
|
||||
self.telemetry = MagicMock(spec=KrknTelemetryOpenshift)
|
||||
self.scenario_telemetry = MagicMock(spec=ScenarioTelemetry)
|
||||
self.telemetry.get_lib_kubernetes.return_value = self.k8s_client
|
||||
|
||||
def test_successful_injection_and_recovery(self):
|
||||
"""
|
||||
Test successful deletion and recovery of a VMI
|
||||
"""
|
||||
# Mock get_vmi to return our mock VMI
|
||||
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
|
||||
# Mock inject and recover to simulate success
|
||||
with patch.object(self.plugin, 'inject', return_value=0) as mock_inject:
|
||||
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
|
||||
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
|
||||
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
|
||||
|
||||
self.assertEqual(result, 0)
|
||||
mock_inject.assert_called_once_with("test-vm", "default", False)
|
||||
mock_recover.assert_called_once_with("test-vm", "default", False)
|
||||
|
||||
def test_injection_failure(self):
|
||||
"""
|
||||
Test failure during VMI deletion
|
||||
"""
|
||||
# Mock get_vmi to return our mock VMI
|
||||
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
|
||||
# Mock inject to simulate failure
|
||||
with patch.object(self.plugin, 'inject', return_value=1) as mock_inject:
|
||||
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
|
||||
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
|
||||
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
|
||||
|
||||
self.assertEqual(result, 1)
|
||||
mock_inject.assert_called_once_with("test-vm", "default", False)
|
||||
mock_recover.assert_not_called()
|
||||
|
||||
def test_disable_auto_restart(self):
|
||||
"""
|
||||
Test VM auto-restart can be disabled
|
||||
"""
|
||||
# Configure test with disable_auto_restart=True
|
||||
self.config["scenarios"][0]["parameters"]["disable_auto_restart"] = True
|
||||
|
||||
# Mock VM object for patching
|
||||
mock_vm = {
|
||||
"metadata": {"name": "test-vm", "namespace": "default"},
|
||||
"spec": {}
|
||||
}
|
||||
|
||||
# Mock get_vmi to return our mock VMI
|
||||
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
|
||||
# Mock VM patch operation
|
||||
with patch.object(self.plugin, 'patch_vm_spec') as mock_patch_vm:
|
||||
mock_patch_vm.return_value = True
|
||||
# Mock inject and recover
|
||||
with patch.object(self.plugin, 'inject', return_value=0) as mock_inject:
|
||||
with patch.object(self.plugin, 'recover', return_value=0) as mock_recover:
|
||||
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
|
||||
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
|
||||
|
||||
self.assertEqual(result, 0)
|
||||
# Should call patch_vm_spec to disable auto-restart
|
||||
mock_patch_vm.assert_any_call("test-vm", "default", False)
|
||||
# Should call patch_vm_spec to re-enable auto-restart during recovery
|
||||
mock_patch_vm.assert_any_call("test-vm", "default", True)
|
||||
mock_inject.assert_called_once_with("test-vm", "default", True)
|
||||
mock_recover.assert_called_once_with("test-vm", "default", True)
|
||||
|
||||
def test_recovery_when_vmi_does_not_exist(self):
|
||||
"""
|
||||
Test recovery logic when VMI does not exist after deletion
|
||||
"""
|
||||
# Store the original VMI in the plugin for recovery
|
||||
self.plugin.original_vmi = self.mock_vmi.copy()
|
||||
|
||||
# Create a cleaned vmi_dict as the plugin would
|
||||
vmi_dict = self.mock_vmi.copy()
|
||||
|
||||
# Set up running VMI data for after recovery
|
||||
running_vmi = {
|
||||
"metadata": {"name": "test-vm", "namespace": "default"},
|
||||
"status": {"phase": "Running"}
|
||||
}
|
||||
|
||||
# Set up time.time to immediately exceed the timeout for auto-recovery
|
||||
with patch('time.time', side_effect=[0, 301, 301, 301, 301, 310, 320]):
|
||||
# Mock get_vmi to always return None (not auto-recovered)
|
||||
with patch.object(self.plugin, 'get_vmi', side_effect=[None, None, running_vmi]):
|
||||
# Mock the custom object API to return success
|
||||
self.custom_object_client.create_namespaced_custom_object = MagicMock(return_value=running_vmi)
|
||||
|
||||
# Run recovery with mocked time.sleep
|
||||
with patch('time.sleep'):
|
||||
result = self.plugin.recover("test-vm", "default", False)
|
||||
|
||||
self.assertEqual(result, 0)
|
||||
# Verify create was called with the right arguments for our API version and kind
|
||||
self.custom_object_client.create_namespaced_custom_object.assert_called_once_with(
|
||||
group="kubevirt.io",
|
||||
version="v1",
|
||||
namespace="default",
|
||||
plural="virtualmachineinstances",
|
||||
body=vmi_dict
|
||||
)
|
||||
|
||||
def test_validation_failure(self):
|
||||
"""
|
||||
Test validation failure when KubeVirt is not installed
|
||||
"""
|
||||
# Mock empty CRD list (no KubeVirt CRDs)
|
||||
empty_crd_list = MagicMock()
|
||||
empty_crd_list.items = []
|
||||
self.k8s_client.list_custom_resource_definition.return_value = empty_crd_list
|
||||
|
||||
with patch("builtins.open", unittest.mock.mock_open(read_data=yaml.dump(self.config))):
|
||||
result = self.plugin.run("test-uuid", self.scenario_file, {}, self.telemetry, self.scenario_telemetry)
|
||||
|
||||
self.assertEqual(result, 1)
|
||||
|
||||
def test_delete_vmi_timeout(self):
|
||||
"""
|
||||
Test timeout during VMI deletion
|
||||
"""
|
||||
# Mock successful delete operation
|
||||
self.custom_object_client.delete_namespaced_custom_object = MagicMock(return_value={})
|
||||
|
||||
# Mock that get_vmi always returns VMI (never gets deleted)
|
||||
with patch.object(self.plugin, 'get_vmi', return_value=self.mock_vmi):
|
||||
# Simulate timeout by making time.time return values that exceed the timeout
|
||||
with patch('time.sleep'), patch('time.time', side_effect=[0, 10, 20, 130, 130, 130, 130, 140]):
|
||||
result = self.plugin.inject("test-vm", "default", False)
|
||||
|
||||
self.assertEqual(result, 1)
|
||||
self.custom_object_client.delete_namespaced_custom_object.assert_called_once_with(
|
||||
group="kubevirt.io",
|
||||
version="v1",
|
||||
namespace="default",
|
||||
plural="virtualmachineinstances",
|
||||
name="test-vm"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user