Compare commits

..

11 Commits

Author SHA1 Message Date
Ashish Mahajan
8dad2a3996 fix: use per-URL status_code in HealthChecker telemetry (#1091)
Signed-off-by: AR21SM <mahajanashishar21sm@gmail.com>
Co-authored-by: Paige Patton <64206430+paigerube14@users.noreply.github.com>
2026-02-19 09:25:03 -05:00
Tullio Sebastiani
cebc60f5a8 Network chaos NG porting - pod network chaos node network chaos (#991)
* fix ibm

Signed-off-by: Paige Patton <prubenda@redhat.com>

* type hint fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* pod network chaos plugin structure + utils method refactoring

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* Pod network chaos plugin

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* Node network chaos plugin

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* default config files

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* config.yaml

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* all field optional

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* minor fixes

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* minor nit on config

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* utils unit tests

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* PodNetworkChaos unit tests

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* NodeNetworkChaos unit test

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* PodNetworkChaos functional test

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* NodeNetworkChaso functional test

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* added funtests to the gh action

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* unit test fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* changed test order + resource rename

* functional tests fix

smallchange

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

fix requirements

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* changed pod test target

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* added kind port mapping and removed portforwarding

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

test fixes

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

test fixes

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Paige Patton <prubenda@redhat.com>
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
Co-authored-by: Paige Patton <prubenda@redhat.com>
2026-02-18 16:20:16 +01:00
Darshan Jain
2065443622 collect ERROR and CRITICAL logs and send to elastic search (#1147) (#1150)
* collect ERROR and CRITICAL logs and send to elastic search

Signed-off-by: ddjain <darjain@redhat.com>

* bump up krkn-lib to 6.0.3

Signed-off-by: ddjain <darjain@redhat.com>

---------

Signed-off-by: ddjain <darjain@redhat.com>
2026-02-18 18:26:14 +05:30
Ashish Mahajan
b6ef7fa052 fix: use list comprehension to avoid skipping nodes during exclusion (#1059)
Fixes #1058

Signed-off-by: AR21SM <mahajanashishar21sm@gmail.com>
Co-authored-by: Paige Patton <64206430+paigerube14@users.noreply.github.com>
2026-02-17 15:20:10 -05:00
Paige Patton
4f305e78aa remove chaos ai
Signed-off-by: Paige Patton <prubenda@redhat.com>
2026-02-11 13:44:13 -05:00
dependabot[bot]
b17e933134 Bump pillow from 10.3.0 to 12.1.1 in /utils/chaos_ai (#1157)
Bumps [pillow](https://github.com/python-pillow/Pillow) from 10.3.0 to 12.1.1.
- [Release notes](https://github.com/python-pillow/Pillow/releases)
- [Changelog](https://github.com/python-pillow/Pillow/blob/main/CHANGES.rst)
- [Commits](https://github.com/python-pillow/Pillow/compare/10.3.0...12.1.1)

---
updated-dependencies:
- dependency-name: pillow
  dependency-version: 12.1.1
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-02-11 10:08:42 -05:00
Paige Patton
beea484597 adding vm ware tests (#1133)
Signed-off-by: Paige Patton <paigepatton@Paiges-MacBook-Air.local>
Signed-off-by: Paige Patton <prubenda@redhat.com>
Co-authored-by: Paige Patton <paigepatton@Paiges-MacBook-Air.local>
2026-02-10 16:24:26 -05:00
Paige Patton
0222b0f161 fix ibm (#1155)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2026-02-10 10:09:28 -05:00
Ashish Mahajan
f7e674d5ad docs: fix typos in logs, comments, and documentation (#1079)
Signed-off-by: AR21SM <mahajanashishar21sm@gmail.com>
2026-02-09 09:48:51 -05:00
Ashish Mahajan
7aea12ce6c fix(VirtChecker): handle empty VMI interfaces list (#1072)
Signed-off-by: AR21SM <mahajanashishar21sm@gmail.com>
Co-authored-by: Paige Patton <64206430+paigerube14@users.noreply.github.com>
2026-02-09 08:29:48 -05:00
Darshan Jain
625e1e90cf feat: add color-coded console logging (#1122) (#1146)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 2m16s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped
Manage Stale Issues and Pull Requests / Mark and Close Stale Issues and PRs (push) Successful in 24s
Signed-off-by: ddjain <darjain@redhat.com>
2026-02-05 14:27:52 +05:30
68 changed files with 3679 additions and 1591 deletions

View File

@@ -43,11 +43,11 @@ jobs:
- name: Deploy test workloads
run: |
es_pod_name=$(kubectl get pods -l "app=elasticsearch-master" -o name)
echo "POD_NAME: $es_pod_name"
kubectl --namespace default port-forward $es_pod_name 9200 &
prom_name=$(kubectl get pods -n monitoring -l "app.kubernetes.io/name=prometheus" -o name)
kubectl --namespace monitoring port-forward $prom_name 9090 &
# es_pod_name=$(kubectl get pods -l "app=elasticsearch-master" -o name)
# echo "POD_NAME: $es_pod_name"
# kubectl --namespace default port-forward $es_pod_name 9200 &
# prom_name=$(kubectl get pods -n monitoring -l "app.kubernetes.io/name=prometheus" -o name)
# kubectl --namespace monitoring port-forward $prom_name 9090 &
# Wait for Elasticsearch to be ready
echo "Waiting for Elasticsearch to be ready..."
@@ -85,7 +85,7 @@ jobs:
yq -i '.elastic.enable_elastic=False' CI/config/common_test_config.yaml
yq -i '.elastic.password="${{env.ELASTIC_PASSWORD}}"' CI/config/common_test_config.yaml
yq -i '.performance_monitoring.prometheus_url="http://localhost:9090"' CI/config/common_test_config.yaml
echo "test_app_outages" >> ./CI/tests/functional_tests
echo "test_app_outages" > ./CI/tests/functional_tests
echo "test_container" >> ./CI/tests/functional_tests
echo "test_cpu_hog" >> ./CI/tests/functional_tests
echo "test_customapp_pod" >> ./CI/tests/functional_tests
@@ -94,13 +94,17 @@ jobs:
echo "test_namespace" >> ./CI/tests/functional_tests
echo "test_net_chaos" >> ./CI/tests/functional_tests
echo "test_node" >> ./CI/tests/functional_tests
echo "test_pod" >> ./CI/tests/functional_tests
echo "test_pod_error" >> ./CI/tests/functional_tests
echo "test_service_hijacking" >> ./CI/tests/functional_tests
echo "test_pod_network_filter" >> ./CI/tests/functional_tests
echo "test_pod_server" >> ./CI/tests/functional_tests
echo "test_time" >> ./CI/tests/functional_tests
echo "test_node_network_chaos" >> ./CI/tests/functional_tests
echo "test_pod_network_chaos" >> ./CI/tests/functional_tests
echo "test_pod_error" >> ./CI/tests/functional_tests
echo "test_pod" >> ./CI/tests/functional_tests
# echo "test_pvc" >> ./CI/tests/functional_tests
# Push on main only steps + all other functional to collect coverage
# for the badge

View File

@@ -42,7 +42,7 @@ telemetry:
prometheus_backup: True # enables/disables prometheus data collection
full_prometheus_backup: False # if is set to False only the /prometheus/wal folder will be downloaded.
backup_threads: 5 # number of telemetry download/upload threads
archive_path: /tmp # local path where the archive files will be temporarly stored
archive_path: /tmp # local path where the archive files will be temporarily stored
max_retries: 0 # maximum number of upload retries (if 0 will retry forever)
run_tag: '' # if set, this will be appended to the run folder in the bucket (useful to group the runs)
archive_size: 10000 # the size of the prometheus data archive size in KB. The lower the size of archive is

View File

@@ -0,0 +1,165 @@
set -xeEo pipefail
source CI/tests/common.sh
trap error ERR
trap finish EXIT
function functional_test_node_network_chaos {
echo "Starting node network chaos functional test"
# Get a worker node
get_node
export TARGET_NODE=$(echo $WORKER_NODE | awk '{print $1}')
echo "Target node: $TARGET_NODE"
# Deploy nginx workload on the target node
echo "Deploying nginx workload on $TARGET_NODE..."
kubectl create deployment nginx-node-net-chaos --image=nginx:latest
# Add node selector to ensure pod runs on target node
kubectl patch deployment nginx-node-net-chaos -p '{"spec":{"template":{"spec":{"nodeSelector":{"kubernetes.io/hostname":"'$TARGET_NODE'"}}}}}'
# Expose service
kubectl expose deployment nginx-node-net-chaos --port=80 --target-port=80 --name=nginx-node-net-chaos-svc
# Wait for nginx to be ready
echo "Waiting for nginx pod to be ready on $TARGET_NODE..."
kubectl wait --for=condition=ready pod -l app=nginx-node-net-chaos --timeout=120s
# Verify pod is on correct node
export POD_NAME=$(kubectl get pods -l app=nginx-node-net-chaos -o jsonpath='{.items[0].metadata.name}')
export POD_NODE=$(kubectl get pod $POD_NAME -o jsonpath='{.spec.nodeName}')
echo "Pod $POD_NAME is running on node $POD_NODE"
if [ "$POD_NODE" != "$TARGET_NODE" ]; then
echo "ERROR: Pod is not on target node (expected $TARGET_NODE, got $POD_NODE)"
kubectl get pods -l app=nginx-node-net-chaos -o wide
exit 1
fi
# Setup port-forward to access nginx
echo "Setting up port-forward to nginx service..."
kubectl port-forward service/nginx-node-net-chaos-svc 8091:80 &
PORT_FORWARD_PID=$!
sleep 3 # Give port-forward time to start
# Test baseline connectivity
echo "Testing baseline connectivity..."
response=$(curl -s -o /dev/null -w "%{http_code}" --max-time 5 http://localhost:8091 || echo "000")
if [ "$response" != "200" ]; then
echo "ERROR: Nginx not responding correctly (got $response, expected 200)"
kubectl get pods -l app=nginx-node-net-chaos
kubectl describe pod $POD_NAME
exit 1
fi
echo "Baseline test passed: nginx responding with 200"
# Measure baseline latency
echo "Measuring baseline latency..."
baseline_start=$(date +%s%3N)
curl -s http://localhost:8091 > /dev/null || true
baseline_end=$(date +%s%3N)
baseline_latency=$((baseline_end - baseline_start))
echo "Baseline latency: ${baseline_latency}ms"
# Configure node network chaos scenario
echo "Configuring node network chaos scenario..."
yq -i '.[0].config.target="'$TARGET_NODE'"' scenarios/kube/node-network-chaos.yml
yq -i '.[0].config.namespace="default"' scenarios/kube/node-network-chaos.yml
yq -i '.[0].config.test_duration=20' scenarios/kube/node-network-chaos.yml
yq -i '.[0].config.latency="200ms"' scenarios/kube/node-network-chaos.yml
yq -i '.[0].config.loss=15' scenarios/kube/node-network-chaos.yml
yq -i '.[0].config.bandwidth="10mbit"' scenarios/kube/node-network-chaos.yml
yq -i '.[0].config.ingress=true' scenarios/kube/node-network-chaos.yml
yq -i '.[0].config.egress=true' scenarios/kube/node-network-chaos.yml
yq -i '.[0].config.force=false' scenarios/kube/node-network-chaos.yml
yq -i 'del(.[0].config.interfaces)' scenarios/kube/node-network-chaos.yml
# Prepare krkn config
export scenario_type="network_chaos_ng_scenarios"
export scenario_file="scenarios/kube/node-network-chaos.yml"
export post_config=""
envsubst < CI/config/common_test_config.yaml > CI/config/node_network_chaos_config.yaml
# Run krkn in background
echo "Starting krkn with node network chaos scenario..."
python3 -m coverage run -a run_kraken.py -c CI/config/node_network_chaos_config.yaml &
KRKN_PID=$!
echo "Krkn started with PID: $KRKN_PID"
# Wait for chaos to start (give it time to inject chaos)
echo "Waiting for chaos injection to begin..."
sleep 10
# Test during chaos - check for increased latency or packet loss effects
echo "Testing network behavior during chaos..."
chaos_test_count=0
chaos_success=0
for i in {1..5}; do
chaos_test_count=$((chaos_test_count + 1))
chaos_start=$(date +%s%3N)
response=$(curl -s -o /dev/null -w "%{http_code}" --max-time 10 http://localhost:8091 || echo "000")
chaos_end=$(date +%s%3N)
chaos_latency=$((chaos_end - chaos_start))
echo "Attempt $i: HTTP $response, latency: ${chaos_latency}ms"
# We expect either increased latency or some failures due to packet loss
if [ "$response" == "200" ] || [ "$response" == "000" ]; then
chaos_success=$((chaos_success + 1))
fi
sleep 2
done
echo "Chaos test results: $chaos_success/$chaos_test_count requests processed"
# Verify node-level chaos affects pod
echo "Verifying node-level chaos affects pod on $TARGET_NODE..."
# The node chaos should affect all pods on the node
# Wait for krkn to complete
echo "Waiting for krkn to complete..."
wait $KRKN_PID || true
echo "Krkn completed"
# Wait a bit for cleanup
sleep 5
# Verify recovery - nginx should respond normally again
echo "Verifying service recovery..."
recovery_attempts=0
max_recovery_attempts=10
while [ $recovery_attempts -lt $max_recovery_attempts ]; do
recovery_attempts=$((recovery_attempts + 1))
response=$(curl -s -o /dev/null -w "%{http_code}" --max-time 5 http://localhost:8091 || echo "000")
if [ "$response" == "200" ]; then
echo "Recovery verified: nginx responding normally (attempt $recovery_attempts)"
break
fi
echo "Recovery attempt $recovery_attempts/$max_recovery_attempts: got $response, retrying..."
sleep 3
done
if [ "$response" != "200" ]; then
echo "ERROR: Service did not recover after chaos (got $response)"
kubectl get pods -l app=nginx-node-net-chaos
kubectl describe pod $POD_NAME
exit 1
fi
# Cleanup
echo "Cleaning up test resources..."
kill $PORT_FORWARD_PID 2>/dev/null || true
kubectl delete deployment nginx-node-net-chaos --ignore-not-found=true
kubectl delete service nginx-node-net-chaos-svc --ignore-not-found=true
echo "Node network chaos test: Success"
}
functional_test_node_network_chaos

View File

@@ -7,14 +7,15 @@ trap finish EXIT
function functional_test_pod_crash {
export scenario_type="pod_disruption_scenarios"
export scenario_file="scenarios/kind/pod_etcd.yml"
export scenario_file="scenarios/kind/pod_path_provisioner.yml"
export post_config=""
envsubst < CI/config/common_test_config.yaml > CI/config/pod_config.yaml
python3 -m coverage run -a run_kraken.py -c CI/config/pod_config.yaml
echo "Pod disruption scenario test: Success"
date
kubectl get pods -n kube-system -l component=etcd -o yaml
kubectl get pods -n local-path-storage -l app=local-path-provisioner -o yaml
}
functional_test_pod_crash

View File

@@ -1,4 +1,5 @@
source CI/tests/common.sh
trap error ERR
@@ -8,7 +9,9 @@ function functional_test_pod_error {
export scenario_type="pod_disruption_scenarios"
export scenario_file="scenarios/kind/pod_etcd.yml"
export post_config=""
# this test will check if krkn exits with an error when too many pods are targeted
yq -i '.[0].config.kill=5' scenarios/kind/pod_etcd.yml
yq -i '.[0].config.krkn_pod_recovery_time=1' scenarios/kind/pod_etcd.yml
envsubst < CI/config/common_test_config.yaml > CI/config/pod_config.yaml
cat CI/config/pod_config.yaml

View File

@@ -0,0 +1,143 @@
set -xeEo pipefail
source CI/tests/common.sh
trap error ERR
trap finish EXIT
function functional_test_pod_network_chaos {
echo "Starting pod network chaos functional test"
# Deploy nginx workload
echo "Deploying nginx workload..."
kubectl create deployment nginx-pod-net-chaos --image=nginx:latest
kubectl expose deployment nginx-pod-net-chaos --port=80 --target-port=80 --name=nginx-pod-net-chaos-svc
# Wait for nginx to be ready
echo "Waiting for nginx pod to be ready..."
kubectl wait --for=condition=ready pod -l app=nginx-pod-net-chaos --timeout=120s
# Get pod name
export POD_NAME=$(kubectl get pods -l app=nginx-pod-net-chaos -o jsonpath='{.items[0].metadata.name}')
echo "Target pod: $POD_NAME"
# Setup port-forward to access nginx
echo "Setting up port-forward to nginx service..."
kubectl port-forward service/nginx-pod-net-chaos-svc 8090:80 &
PORT_FORWARD_PID=$!
sleep 3 # Give port-forward time to start
# Test baseline connectivity
echo "Testing baseline connectivity..."
response=$(curl -s -o /dev/null -w "%{http_code}" --max-time 5 http://localhost:8090 || echo "000")
if [ "$response" != "200" ]; then
echo "ERROR: Nginx not responding correctly (got $response, expected 200)"
kubectl get pods -l app=nginx-pod-net-chaos
kubectl describe pod $POD_NAME
exit 1
fi
echo "Baseline test passed: nginx responding with 200"
# Measure baseline latency
echo "Measuring baseline latency..."
baseline_start=$(date +%s%3N)
curl -s http://localhost:8090 > /dev/null || true
baseline_end=$(date +%s%3N)
baseline_latency=$((baseline_end - baseline_start))
echo "Baseline latency: ${baseline_latency}ms"
# Configure pod network chaos scenario
echo "Configuring pod network chaos scenario..."
yq -i '.[0].config.target="'$POD_NAME'"' scenarios/kube/pod-network-chaos.yml
yq -i '.[0].config.namespace="default"' scenarios/kube/pod-network-chaos.yml
yq -i '.[0].config.test_duration=20' scenarios/kube/pod-network-chaos.yml
yq -i '.[0].config.latency="200ms"' scenarios/kube/pod-network-chaos.yml
yq -i '.[0].config.loss=15' scenarios/kube/pod-network-chaos.yml
yq -i '.[0].config.bandwidth="10mbit"' scenarios/kube/pod-network-chaos.yml
yq -i '.[0].config.ingress=true' scenarios/kube/pod-network-chaos.yml
yq -i '.[0].config.egress=true' scenarios/kube/pod-network-chaos.yml
yq -i 'del(.[0].config.interfaces)' scenarios/kube/pod-network-chaos.yml
# Prepare krkn config
export scenario_type="network_chaos_ng_scenarios"
export scenario_file="scenarios/kube/pod-network-chaos.yml"
export post_config=""
envsubst < CI/config/common_test_config.yaml > CI/config/pod_network_chaos_config.yaml
# Run krkn in background
echo "Starting krkn with pod network chaos scenario..."
python3 -m coverage run -a run_kraken.py -c CI/config/pod_network_chaos_config.yaml &
KRKN_PID=$!
echo "Krkn started with PID: $KRKN_PID"
# Wait for chaos to start (give it time to inject chaos)
echo "Waiting for chaos injection to begin..."
sleep 10
# Test during chaos - check for increased latency or packet loss effects
echo "Testing network behavior during chaos..."
chaos_test_count=0
chaos_success=0
for i in {1..5}; do
chaos_test_count=$((chaos_test_count + 1))
chaos_start=$(date +%s%3N)
response=$(curl -s -o /dev/null -w "%{http_code}" --max-time 10 http://localhost:8090 || echo "000")
chaos_end=$(date +%s%3N)
chaos_latency=$((chaos_end - chaos_start))
echo "Attempt $i: HTTP $response, latency: ${chaos_latency}ms"
# We expect either increased latency or some failures due to packet loss
if [ "$response" == "200" ] || [ "$response" == "000" ]; then
chaos_success=$((chaos_success + 1))
fi
sleep 2
done
echo "Chaos test results: $chaos_success/$chaos_test_count requests processed"
# Wait for krkn to complete
echo "Waiting for krkn to complete..."
wait $KRKN_PID || true
echo "Krkn completed"
# Wait a bit for cleanup
sleep 5
# Verify recovery - nginx should respond normally again
echo "Verifying service recovery..."
recovery_attempts=0
max_recovery_attempts=10
while [ $recovery_attempts -lt $max_recovery_attempts ]; do
recovery_attempts=$((recovery_attempts + 1))
response=$(curl -s -o /dev/null -w "%{http_code}" --max-time 5 http://localhost:8090 || echo "000")
if [ "$response" == "200" ]; then
echo "Recovery verified: nginx responding normally (attempt $recovery_attempts)"
break
fi
echo "Recovery attempt $recovery_attempts/$max_recovery_attempts: got $response, retrying..."
sleep 3
done
if [ "$response" != "200" ]; then
echo "ERROR: Service did not recover after chaos (got $response)"
kubectl get pods -l app=nginx-pod-net-chaos
kubectl describe pod $POD_NAME
exit 1
fi
# Cleanup
echo "Cleaning up test resources..."
kill $PORT_FORWARD_PID 2>/dev/null || true
kubectl delete deployment nginx-pod-net-chaos --ignore-not-found=true
kubectl delete service nginx-pod-net-chaos-svc --ignore-not-found=true
echo "Pod network chaos test: Success"
}
functional_test_pod_network_chaos

View File

@@ -26,7 +26,7 @@ Here is an excerpt:
## Maintainer Levels
### Contributor
Contributors contributor to the community. Anyone can become a contributor by participating in discussions, reporting bugs, or contributing code or documentation.
Contributors contribute to the community. Anyone can become a contributor by participating in discussions, reporting bugs, or contributing code or documentation.
#### Responsibilities:
@@ -80,4 +80,4 @@ Represent the project in the broader open-source community.
# Credits
Sections of this documents have been borrowed from [Kubernetes governance](https://github.com/kubernetes/community/blob/master/governance.md)
Sections of this document have been borrowed from [Kubernetes governance](https://github.com/kubernetes/community/blob/master/governance.md)

View File

@@ -16,5 +16,5 @@ Following are a list of enhancements that we are planning to work on adding supp
- [x] [Krknctl - client for running Krkn scenarios with ease](https://github.com/krkn-chaos/krknctl)
- [x] [AI Chat bot to help get started with Krkn and commands](https://github.com/krkn-chaos/krkn-lightspeed)
- [ ] [Ability to roll back cluster to original state if chaos fails](https://github.com/krkn-chaos/krkn/issues/804)
- [ ] Add recovery time metrics to each scenario for each better regression analysis
- [ ] Add recovery time metrics to each scenario for better regression analysis
- [ ] [Add resiliency scoring to chaos scenarios ran on cluster](https://github.com/krkn-chaos/krkn/issues/125)

View File

@@ -40,4 +40,4 @@ The security team currently consists of the [Maintainers of Krkn](https://github
## Process and Supported Releases
The Krkn security team will investigate and provide a fix in a timely mannner depending on the severity. The fix will be included in the new release of Krkn and details will be included in the release notes.
The Krkn security team will investigate and provide a fix in a timely manner depending on the severity. The fix will be included in the new release of Krkn and details will be included in the release notes.

View File

@@ -39,7 +39,7 @@ cerberus:
Sunday:
slack_team_alias: # The slack team alias to be tagged while reporting failures in the slack channel when no watcher is assigned
custom_checks: # Relative paths of files conataining additional user defined checks
custom_checks: # Relative paths of files containing additional user defined checks
tunings:
timeout: 3 # Number of seconds before requests fail

View File

@@ -50,6 +50,8 @@ kraken:
- network_chaos_ng_scenarios:
- scenarios/kube/pod-network-filter.yml
- scenarios/kube/node-network-filter.yml
- scenarios/kube/node-network-chaos.yml
- scenarios/kube/pod-network-chaos.yml
- kubevirt_vm_outage:
- scenarios/kubevirt/kubevirt-vm-outage.yaml
@@ -93,7 +95,7 @@ telemetry:
prometheus_pod_name: "" # name of the prometheus pod (if distribution is kubernetes)
full_prometheus_backup: False # if is set to False only the /prometheus/wal folder will be downloaded.
backup_threads: 5 # number of telemetry download/upload threads
archive_path: /tmp # local path where the archive files will be temporarly stored
archive_path: /tmp # local path where the archive files will be temporarily stored
max_retries: 0 # maximum number of upload retries (if 0 will retry forever)
run_tag: '' # if set, this will be appended to the run folder in the bucket (useful to group the runs)
archive_size: 500000

View File

@@ -32,7 +32,7 @@ tunings:
telemetry:
enabled: False # enable/disables the telemetry collection feature
archive_path: /tmp # local path where the archive files will be temporarly stored
archive_path: /tmp # local path where the archive files will be temporarily stored
events_backup: False # enables/disables cluster events collection
logs_backup: False

View File

@@ -61,7 +61,7 @@ telemetry:
prometheus_backup: True # enables/disables prometheus data collection
full_prometheus_backup: False # if is set to False only the /prometheus/wal folder will be downloaded.
backup_threads: 5 # number of telemetry download/upload threads
archive_path: /tmp # local path where the archive files will be temporarly stored
archive_path: /tmp # local path where the archive files will be temporarily stored
max_retries: 0 # maximum number of upload retries (if 0 will retry forever)
run_tag: '' # if set, this will be appended to the run folder in the bucket (useful to group the runs)
archive_size: 500000 # the size of the prometheus data archive size in KB. The lower the size of archive is

View File

@@ -3,10 +3,16 @@ apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
extraPortMappings:
- containerPort: 30000
hostPort: 9090
- containerPort: 32766
hostPort: 9200
- containerPort: 30036
hostPort: 8888
- containerPort: 30037
hostPort: 8889
- containerPort: 30080
hostPort: 30080
- role: control-plane
- role: control-plane
- role: worker

View File

@@ -214,7 +214,7 @@ def metrics(
end_time=datetime.datetime.fromtimestamp(end_time), granularity=30
)
else:
logging.info('didnt match keys')
logging.info("didn't match keys")
continue
for returned_metric in metrics_result:

View File

@@ -36,7 +36,7 @@ def get_test_pods(
- pods matching the label on which network policy
need to be applied
namepsace (string)
namespace (string)
- namespace in which the pod is present
kubecli (KrknKubernetes)

View File

@@ -1,5 +1,7 @@
import re
from dataclasses import dataclass
from enum import Enum
from typing import TypeVar, Optional
class NetworkChaosScenarioType(Enum):
@@ -9,16 +11,21 @@ class NetworkChaosScenarioType(Enum):
@dataclass
class BaseNetworkChaosConfig:
supported_execution = ["serial", "parallel"]
id: str
image: str
wait_duration: int
test_duration: int
label_selector: str
service_account: str
taints: list[str]
namespace: str
instance_count: int
execution: str
namespace: str
taints: list[str]
supported_execution = ["serial", "parallel"]
interfaces: list[str]
target: str
ingress: bool
egress: bool
def validate(self) -> list[str]:
errors = []
@@ -41,12 +48,7 @@ class BaseNetworkChaosConfig:
@dataclass
class NetworkFilterConfig(BaseNetworkChaosConfig):
ingress: bool
egress: bool
interfaces: list[str]
target: str
ports: list[int]
image: str
protocols: list[str]
def validate(self) -> list[str]:
@@ -58,3 +60,30 @@ class NetworkFilterConfig(BaseNetworkChaosConfig):
f"{self.protocols} contains not allowed protocols only tcp and udp is allowed"
)
return errors
@dataclass
class NetworkChaosConfig(BaseNetworkChaosConfig):
latency: Optional[str] = None
loss: Optional[str] = None
bandwidth: Optional[str] = None
force: Optional[bool] = None
def validate(self) -> list[str]:
errors = super().validate()
latency_regex = re.compile(r"^(\d+)(us|ms|s)$")
bandwidth_regex = re.compile(r"^(\d+)(bit|kbit|mbit|gbit|tbit)$")
if self.latency:
if not (latency_regex.match(self.latency)):
errors.append(
"latency must be a number followed by `us` (microseconds) or `ms` (milliseconds), or `s` (seconds)"
)
if self.bandwidth:
if not (bandwidth_regex.match(self.bandwidth)):
errors.append(
"bandwidth must be a number followed by `bit` `kbit` or `mbit` or `tbit`"
)
if self.loss:
if "%" in self.loss or not self.loss.isdigit():
errors.append("loss must be a number followed without the `%` symbol")
return errors

View File

@@ -1,6 +1,7 @@
import abc
import logging
import queue
from typing import Tuple
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.network_chaos_ng.models import (
@@ -27,7 +28,7 @@ class AbstractNetworkChaosModule(abc.ABC):
pass
@abc.abstractmethod
def get_config(self) -> (NetworkChaosScenarioType, BaseNetworkChaosConfig):
def get_config(self) -> Tuple[NetworkChaosScenarioType, BaseNetworkChaosConfig]:
"""
returns the common subset of settings shared by all the scenarios `BaseNetworkChaosConfig` and the type of Network
Chaos Scenario that is running (Pod Scenario or Node Scenario)
@@ -41,6 +42,42 @@ class AbstractNetworkChaosModule(abc.ABC):
pass
def get_node_targets(self, config: BaseNetworkChaosConfig):
if self.base_network_config.label_selector:
return self.kubecli.get_lib_kubernetes().list_nodes(
self.base_network_config.label_selector
)
else:
if not config.target:
raise Exception(
"neither node selector nor node_name (target) specified, aborting."
)
node_info = self.kubecli.get_lib_kubernetes().list_nodes()
if config.target not in node_info:
raise Exception(f"node {config.target} not found, aborting")
return [config.target]
def get_pod_targets(self, config: BaseNetworkChaosConfig):
if not config.namespace:
raise Exception("namespace not specified, aborting")
if self.base_network_config.label_selector:
return self.kubecli.get_lib_kubernetes().list_pods(
config.namespace, config.label_selector
)
else:
if not config.target:
raise Exception(
"neither node selector nor node_name (target) specified, aborting."
)
if not self.kubecli.get_lib_kubernetes().check_if_pod_exists(
config.target, config.namespace
):
raise Exception(
f"pod {config.target} not found in namespace {config.namespace}"
)
return [config.target]
def __init__(
self,
base_network_config: BaseNetworkChaosConfig,

View File

@@ -0,0 +1,156 @@
import queue
import time
from typing import Tuple
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_random_string
from krkn.scenario_plugins.network_chaos_ng.models import (
NetworkChaosScenarioType,
BaseNetworkChaosConfig,
NetworkChaosConfig,
)
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.utils import (
log_info,
setup_network_chaos_ng_scenario,
log_error,
log_warning,
)
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos import (
common_set_limit_rules,
common_delete_limit_rules,
node_qdisc_is_simple,
)
class NodeNetworkChaosModule(AbstractNetworkChaosModule):
def __init__(self, config: NetworkChaosConfig, kubecli: KrknTelemetryOpenshift):
super().__init__(config, kubecli)
self.config = config
def run(self, target: str, error_queue: queue.Queue = None):
parallel = False
if error_queue:
parallel = True
try:
network_chaos_pod_name = f"node-network-chaos-{get_random_string(5)}"
container_name = f"fedora-container-{get_random_string(5)}"
log_info(
f"creating workload to inject network chaos in node {target} network"
f"latency:{str(self.config.latency) if self.config.latency else '0'}, "
f"packet drop:{str(self.config.loss) if self.config.loss else '0'} "
f"bandwidth restriction:{str(self.config.bandwidth) if self.config.bandwidth else '0'} ",
parallel,
network_chaos_pod_name,
)
_, interfaces = setup_network_chaos_ng_scenario(
self.config,
target,
network_chaos_pod_name,
container_name,
self.kubecli.get_lib_kubernetes(),
target,
parallel,
True,
)
if len(self.config.interfaces) == 0:
if len(interfaces) == 0:
log_error(
"no network interface found in pod, impossible to execute the network chaos scenario",
parallel,
network_chaos_pod_name,
)
return
log_info(
f"detected network interfaces: {','.join(interfaces)}",
parallel,
network_chaos_pod_name,
)
else:
interfaces = self.config.interfaces
log_info(
f"targeting node {target}",
parallel,
network_chaos_pod_name,
)
complex_config_interfaces = []
for interface in interfaces:
is_simple = node_qdisc_is_simple(
self.kubecli.get_lib_kubernetes(),
network_chaos_pod_name,
self.config.namespace,
interface,
)
if not is_simple:
complex_config_interfaces.append(interface)
if len(complex_config_interfaces) > 0 and not self.config.force:
log_warning(
f"node already has tc rules set for {','.join(complex_config_interfaces)}, this action might damage the cluster,"
"if you want to continue set `force` to True in the node network "
"chaos scenario config file and try again"
)
else:
if len(complex_config_interfaces) > 0 and self.config.force:
log_warning(
f"you are forcing node network configuration override for {','.join(complex_config_interfaces)},"
"this action might lead to unpredictable node behaviour, "
"you're doing it in your own responsibility"
"waiting 10 seconds before continuing"
)
time.sleep(10)
common_set_limit_rules(
self.config.egress,
self.config.ingress,
interfaces,
self.config.bandwidth,
self.config.latency,
self.config.loss,
parallel,
network_chaos_pod_name,
self.kubecli.get_lib_kubernetes(),
network_chaos_pod_name,
self.config.namespace,
None,
)
time.sleep(self.config.test_duration)
log_info("removing tc rules", parallel, network_chaos_pod_name)
common_delete_limit_rules(
self.config.egress,
self.config.ingress,
interfaces,
network_chaos_pod_name,
self.config.namespace,
self.kubecli.get_lib_kubernetes(),
None,
parallel,
network_chaos_pod_name,
)
self.kubecli.get_lib_kubernetes().delete_pod(
network_chaos_pod_name, self.config.namespace
)
except Exception as e:
if error_queue is None:
raise e
else:
error_queue.put(str(e))
def get_config(self) -> Tuple[NetworkChaosScenarioType, BaseNetworkChaosConfig]:
return NetworkChaosScenarioType.Node, self.config
def get_targets(self) -> list[str]:
return self.get_node_targets(self.config)

View File

@@ -1,5 +1,6 @@
import queue
import time
from typing import Tuple
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_random_string
@@ -11,14 +12,16 @@ from krkn.scenario_plugins.network_chaos_ng.models import (
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
from krkn.scenario_plugins.network_chaos_ng.modules.utils import (
log_info,
deploy_network_chaos_ng_pod,
get_pod_default_interface,
)
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_filter import (
deploy_network_filter_pod,
apply_network_rules,
clean_network_rules,
generate_rules,
get_default_interface,
)
@@ -41,7 +44,7 @@ class NodeNetworkFilterModule(AbstractNetworkChaosModule):
)
pod_name = f"node-filter-{get_random_string(5)}"
deploy_network_filter_pod(
deploy_network_chaos_ng_pod(
self.config,
target,
pod_name,
@@ -50,7 +53,7 @@ class NodeNetworkFilterModule(AbstractNetworkChaosModule):
if len(self.config.interfaces) == 0:
interfaces = [
get_default_interface(
get_pod_default_interface(
pod_name,
self.config.namespace,
self.kubecli.get_lib_kubernetes(),
@@ -108,21 +111,8 @@ class NodeNetworkFilterModule(AbstractNetworkChaosModule):
super().__init__(config, kubecli)
self.config = config
def get_config(self) -> (NetworkChaosScenarioType, BaseNetworkChaosConfig):
def get_config(self) -> Tuple[NetworkChaosScenarioType, BaseNetworkChaosConfig]:
return NetworkChaosScenarioType.Node, self.config
def get_targets(self) -> list[str]:
if self.base_network_config.label_selector:
return self.kubecli.get_lib_kubernetes().list_nodes(
self.base_network_config.label_selector
)
else:
if not self.config.target:
raise Exception(
"neither node selector nor node_name (target) specified, aborting."
)
node_info = self.kubecli.get_lib_kubernetes().list_nodes()
if self.config.target not in node_info:
raise Exception(f"node {self.config.target} not found, aborting")
return [self.config.target]
return self.get_node_targets(self.config)

View File

@@ -0,0 +1,159 @@
import queue
import time
from typing import Tuple
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_random_string
from krkn.scenario_plugins.network_chaos_ng.models import (
NetworkChaosScenarioType,
BaseNetworkChaosConfig,
NetworkChaosConfig,
)
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.utils import (
log_info,
setup_network_chaos_ng_scenario,
log_error,
)
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos import (
common_set_limit_rules,
common_delete_limit_rules,
)
class PodNetworkChaosModule(AbstractNetworkChaosModule):
def __init__(self, config: NetworkChaosConfig, kubecli: KrknTelemetryOpenshift):
super().__init__(config, kubecli)
self.config = config
def run(self, target: str, error_queue: queue.Queue = None):
parallel = False
if error_queue:
parallel = True
try:
network_chaos_pod_name = f"pod-network-chaos-{get_random_string(5)}"
container_name = f"fedora-container-{get_random_string(5)}"
pod_info = self.kubecli.get_lib_kubernetes().get_pod_info(
target, self.config.namespace
)
log_info(
f"creating workload to inject network chaos in pod {target} network"
f"latency:{str(self.config.latency) if self.config.latency else '0'}, "
f"packet drop:{str(self.config.loss) if self.config.loss else '0'} "
f"bandwidth restriction:{str(self.config.bandwidth) if self.config.bandwidth else '0'} ",
parallel,
network_chaos_pod_name,
)
if not pod_info:
raise Exception(
f"impossible to retrieve infos for pod {target} namespace {self.config.namespace}"
)
container_ids, interfaces = setup_network_chaos_ng_scenario(
self.config,
pod_info.nodeName,
network_chaos_pod_name,
container_name,
self.kubecli.get_lib_kubernetes(),
target,
parallel,
False,
)
if len(self.config.interfaces) == 0:
if len(interfaces) == 0:
log_error(
"no network interface found in pod, impossible to execute the network chaos scenario",
parallel,
network_chaos_pod_name,
)
return
log_info(
f"detected network interfaces: {','.join(interfaces)}",
parallel,
network_chaos_pod_name,
)
else:
interfaces = self.config.interfaces
if len(container_ids) == 0:
raise Exception(
f"impossible to resolve container id for pod {target} namespace {self.config.namespace}"
)
log_info(
f"targeting container {container_ids[0]}",
parallel,
network_chaos_pod_name,
)
pids = self.kubecli.get_lib_kubernetes().get_pod_pids(
base_pod_name=network_chaos_pod_name,
base_pod_namespace=self.config.namespace,
base_pod_container_name=container_name,
pod_name=target,
pod_namespace=self.config.namespace,
pod_container_id=container_ids[0],
)
if not pids:
raise Exception(f"impossible to resolve pid for pod {target}")
log_info(
f"resolved pids {pids} in node {pod_info.nodeName} for pod {target}",
parallel,
network_chaos_pod_name,
)
common_set_limit_rules(
self.config.egress,
self.config.ingress,
interfaces,
self.config.bandwidth,
self.config.latency,
self.config.loss,
parallel,
network_chaos_pod_name,
self.kubecli.get_lib_kubernetes(),
network_chaos_pod_name,
self.config.namespace,
pids,
)
time.sleep(self.config.test_duration)
log_info("removing tc rules", parallel, network_chaos_pod_name)
common_delete_limit_rules(
self.config.egress,
self.config.ingress,
interfaces,
network_chaos_pod_name,
self.config.namespace,
self.kubecli.get_lib_kubernetes(),
pids,
parallel,
network_chaos_pod_name,
)
self.kubecli.get_lib_kubernetes().delete_pod(
network_chaos_pod_name, self.config.namespace
)
except Exception as e:
if error_queue is None:
raise e
else:
error_queue.put(str(e))
def get_config(self) -> Tuple[NetworkChaosScenarioType, BaseNetworkChaosConfig]:
return NetworkChaosScenarioType.Pod, self.config
def get_targets(self) -> list[str]:
return self.get_pod_targets(self.config)

View File

@@ -1,6 +1,6 @@
import logging
import queue
import time
from typing import Tuple
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_random_string
@@ -13,12 +13,17 @@ from krkn.scenario_plugins.network_chaos_ng.models import (
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info, log_error
from krkn.scenario_plugins.network_chaos_ng.modules.utils import (
log_info,
log_error,
deploy_network_chaos_ng_pod,
get_pod_default_interface,
setup_network_chaos_ng_scenario,
)
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_filter import (
deploy_network_filter_pod,
generate_namespaced_rules,
apply_network_rules,
clean_network_rules_namespaced,
generate_namespaced_rules,
)
@@ -50,22 +55,18 @@ class PodNetworkFilterModule(AbstractNetworkChaosModule):
f"impossible to retrieve infos for pod {self.config.target} namespace {self.config.namespace}"
)
deploy_network_filter_pod(
container_ids, interfaces = setup_network_chaos_ng_scenario(
self.config,
pod_info.nodeName,
pod_name,
self.kubecli.get_lib_kubernetes(),
container_name,
host_network=False,
self.kubecli.get_lib_kubernetes(),
target,
parallel,
False,
)
if len(self.config.interfaces) == 0:
interfaces = (
self.kubecli.get_lib_kubernetes().list_pod_network_interfaces(
target, self.config.namespace
)
)
if len(interfaces) == 0:
log_error(
"no network interface found in pod, impossible to execute the network filter scenario",
@@ -157,26 +158,8 @@ class PodNetworkFilterModule(AbstractNetworkChaosModule):
super().__init__(config, kubecli)
self.config = config
def get_config(self) -> (NetworkChaosScenarioType, BaseNetworkChaosConfig):
def get_config(self) -> Tuple[NetworkChaosScenarioType, BaseNetworkChaosConfig]:
return NetworkChaosScenarioType.Pod, self.config
def get_targets(self) -> list[str]:
if not self.config.namespace:
raise Exception("namespace not specified, aborting")
if self.base_network_config.label_selector:
return self.kubecli.get_lib_kubernetes().list_pods(
self.config.namespace, self.config.label_selector
)
else:
if not self.config.target:
raise Exception(
"neither node selector nor node_name (target) specified, aborting."
)
if not self.kubecli.get_lib_kubernetes().check_if_pod_exists(
self.config.target, self.config.namespace
):
raise Exception(
f"pod {self.config.target} not found in namespace {self.config.namespace}"
)
return [self.config.target]
return self.get_pod_targets(self.config)

View File

@@ -1,4 +1,15 @@
import logging
import os
from typing import Tuple
import yaml
from jinja2 import FileSystemLoader, Environment
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.k8s import Pod
from krkn.scenario_plugins.network_chaos_ng.models import (
BaseNetworkChaosConfig,
)
def log_info(message: str, parallel: bool = False, node_name: str = ""):
@@ -29,3 +40,101 @@ def log_warning(message: str, parallel: bool = False, node_name: str = ""):
logging.warning(f"[{node_name}]: {message}")
else:
logging.warning(message)
def deploy_network_chaos_ng_pod(
config: BaseNetworkChaosConfig,
target_node: str,
pod_name: str,
kubecli: KrknKubernetes,
container_name: str = "fedora",
host_network: bool = True,
):
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
env = Environment(loader=file_loader, autoescape=True)
pod_template = env.get_template("templates/network-chaos.j2")
tolerations = []
for taint in config.taints:
key_value_part, effect = taint.split(":", 1)
if "=" in key_value_part:
key, value = key_value_part.split("=", 1)
operator = "Equal"
else:
key = key_value_part
value = None
operator = "Exists"
toleration = {
"key": key,
"operator": operator,
"effect": effect,
}
if value is not None:
toleration["value"] = value
tolerations.append(toleration)
pod_body = yaml.safe_load(
pod_template.render(
pod_name=pod_name,
namespace=config.namespace,
host_network=host_network,
target=target_node,
container_name=container_name,
workload_image=config.image,
taints=tolerations,
service_account=config.service_account,
)
)
kubecli.create_pod(pod_body, config.namespace, 300)
def get_pod_default_interface(
pod_name: str, namespace: str, kubecli: KrknKubernetes
) -> str:
cmd = "ip r | grep default | awk '/default/ {print $5}'"
output = kubecli.exec_cmd_in_pod([cmd], pod_name, namespace)
return output.replace("\n", "")
def setup_network_chaos_ng_scenario(
config: BaseNetworkChaosConfig,
node_name: str,
pod_name: str,
container_name: str,
kubecli: KrknKubernetes,
target: str,
parallel: bool,
host_network: bool,
) -> Tuple[list[str], list[str]]:
deploy_network_chaos_ng_pod(
config,
node_name,
pod_name,
kubecli,
container_name,
host_network=host_network,
)
if len(config.interfaces) == 0:
interfaces = [
get_pod_default_interface(
pod_name,
config.namespace,
kubecli,
)
]
log_info(f"detected default interface {interfaces[0]}", parallel, target)
else:
interfaces = config.interfaces
# if not host_network means that the target is a pod so container_ids need to be resolved
# otherwise it's not needed
if not host_network:
container_ids = kubecli.get_container_ids(target, config.namespace)
else:
container_ids = []
return container_ids, interfaces

View File

@@ -0,0 +1,263 @@
import subprocess
import logging
from typing import Optional
from krkn_lib.k8s import KrknKubernetes
from krkn.scenario_plugins.network_chaos_ng.modules.utils import (
log_info,
log_warning,
log_error,
)
ROOT_HANDLE = "100:"
CLASS_ID = "100:1"
NETEM_HANDLE = "101:"
def run(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess:
return subprocess.run(cmd, check=check, text=True, capture_output=True)
def tc_node(args: list[str]) -> subprocess.CompletedProcess:
return run(["tc"] + args)
def get_build_tc_tree_commands(devs: list[str]) -> list[str]:
tree = []
for dev in devs:
tree.append(f"tc qdisc add dev {dev} root handle {ROOT_HANDLE} htb default 1")
tree.append(
f"tc class add dev {dev} parent {ROOT_HANDLE} classid {CLASS_ID} htb rate 1gbit",
)
tree.append(
f"tc qdisc add dev {dev} parent {CLASS_ID} handle {NETEM_HANDLE} netem delay 0ms loss 0%",
)
return tree
def namespaced_tc_commands(pids: list[str], commands: list[str]) -> list[str]:
return [
f"nsenter --target {pid} --net -- {rule}" for pid in pids for rule in commands
]
def get_egress_shaping_comand(
devices: list[str],
rate_mbit: Optional[str],
delay_ms: Optional[str],
loss_pct: Optional[str],
) -> list[str]:
rate_commands = []
rate = f"{rate_mbit}mbit" if rate_mbit is not None else "1gbit"
d = delay_ms if delay_ms is not None else 0
l = loss_pct if loss_pct is not None else 0
for dev in devices:
rate_commands.append(
f"tc class change dev {dev} parent {ROOT_HANDLE} classid {CLASS_ID} htb rate {rate}"
)
rate_commands.append(
f"tc qdisc change dev {dev} parent {CLASS_ID} handle {NETEM_HANDLE} netem delay {d}ms loss {l}%"
)
return rate_commands
def get_clear_egress_shaping_commands(devices: list[str]) -> list[str]:
return [f"tc qdisc del dev {dev} root handle {ROOT_HANDLE}" for dev in devices]
def get_ingress_shaping_commands(
devs: list[str],
rate_mbit: Optional[str],
delay_ms: Optional[str],
loss_pct: Optional[str],
ifb_dev: str = "ifb0",
) -> list[str]:
rate_commands = [
f"modprobe ifb || true",
f"ip link add {ifb_dev} type ifb || true",
f"ip link set {ifb_dev} up || true",
]
for dev in devs:
rate_commands.append(f"tc qdisc add dev {dev} handle ffff: ingress || true")
rate_commands.append(
f"tc filter add dev {dev} parent ffff: protocol all prio 1 "
f"matchall action mirred egress redirect dev {ifb_dev} || true"
)
rate_commands.append(
f"tc qdisc add dev {ifb_dev} root handle {ROOT_HANDLE} htb default 1 || true"
)
rate_commands.append(
f"tc class add dev {ifb_dev} parent {ROOT_HANDLE} classid {CLASS_ID} "
f"htb rate {rate_mbit if rate_mbit else '1gbit'} || true"
)
rate_commands.append(
f"tc qdisc add dev {ifb_dev} parent {CLASS_ID} handle {NETEM_HANDLE} "
f"netem delay {delay_ms if delay_ms else '0ms'} "
f"loss {loss_pct if loss_pct else '0'}% || true"
)
return rate_commands
def get_clear_ingress_shaping_commands(
devs: list[str],
ifb_dev: str = "ifb0",
) -> list[str]:
cmds: list[str] = []
for dev in devs:
cmds.append(f"tc qdisc del dev {dev} ingress || true")
cmds.append(f"tc qdisc del dev {ifb_dev} root handle {ROOT_HANDLE} || true")
cmds.append(f"ip link set {ifb_dev} down || true")
cmds.append(f"ip link del {ifb_dev} || true")
return cmds
def node_qdisc_is_simple(
kubecli: KrknKubernetes, pod_name, namespace: str, interface: str
) -> bool:
result = kubecli.exec_cmd_in_pod(
[f"tc qdisc show dev {interface}"], pod_name, namespace
)
lines = [l for l in result.splitlines() if l.strip()]
if len(lines) != 1:
return False
line = lines[0].lower()
if "htb" in line or "netem" in line or "clsact" in line:
return False
return True
def common_set_limit_rules(
egress: bool,
ingress: bool,
interfaces: list[str],
bandwidth: str,
latency: str,
loss: str,
parallel: bool,
target: str,
kubecli: KrknKubernetes,
network_chaos_pod_name: str,
namespace: str,
pids: Optional[list[str]] = None,
):
if egress:
build_tree_commands = get_build_tc_tree_commands(interfaces)
if pids:
build_tree_commands = namespaced_tc_commands(pids, build_tree_commands)
egress_shaping_commands = get_egress_shaping_comand(
interfaces,
bandwidth,
latency,
loss,
)
if pids:
egress_shaping_commands = namespaced_tc_commands(
pids, egress_shaping_commands
)
error_counter = 0
for rule in build_tree_commands:
result = kubecli.exec_cmd_in_pod([rule], network_chaos_pod_name, namespace)
if not result:
log_info(f"created tc tree in pod: {rule}", parallel, target)
else:
error_counter += 1
if len(build_tree_commands) == error_counter:
log_error(
"failed to apply egress shaping rules on cluster", parallel, target
)
for rule in egress_shaping_commands:
result = kubecli.exec_cmd_in_pod([rule], network_chaos_pod_name, namespace)
if not result:
log_info(f"applied egress shaping rules: {rule}", parallel, target)
if ingress:
ingress_shaping_commands = get_ingress_shaping_commands(
interfaces,
bandwidth,
latency,
loss,
)
if pids:
ingress_shaping_commands = namespaced_tc_commands(
pids, ingress_shaping_commands
)
error_counter = 0
for rule in ingress_shaping_commands:
result = kubecli.exec_cmd_in_pod([rule], network_chaos_pod_name, namespace)
if not result:
log_info(
f"applied ingress shaping rule: {rule}",
parallel,
network_chaos_pod_name,
)
else:
error_counter += 1
if len(ingress_shaping_commands) == error_counter:
log_error(
"failed to apply ingress shaping rules on cluster", parallel, target
)
def common_delete_limit_rules(
egress: bool,
ingress: bool,
interfaces: list[str],
network_chaos_pod_name: str,
network_chaos_namespace: str,
kubecli: KrknKubernetes,
pids: Optional[list[str]],
parallel: bool,
target: str,
):
if egress:
clear_commands = get_clear_egress_shaping_commands(interfaces)
if pids:
clear_commands = namespaced_tc_commands(pids, clear_commands)
error_counter = 0
for rule in clear_commands:
result = kubecli.exec_cmd_in_pod(
[rule], network_chaos_pod_name, network_chaos_namespace
)
if not result:
log_info(f"removed egress shaping rule : {rule}", parallel, target)
else:
error_counter += 1
if len(clear_commands) == error_counter:
log_error(
"failed to remove egress shaping rules on cluster", parallel, target
)
if ingress:
clear_commands = get_clear_ingress_shaping_commands(interfaces)
if pids:
clear_commands = namespaced_tc_commands(pids, clear_commands)
error_counter = 0
for rule in clear_commands:
result = kubecli.exec_cmd_in_pod(
[rule], network_chaos_pod_name, network_chaos_namespace
)
if not result:
log_info(f"removed ingress shaping rule: {rule}", parallel, target)
else:
error_counter += 1
if len(clear_commands) == error_counter:
log_error(
"failed to remove ingress shaping rules on cluster", parallel, target
)

View File

@@ -1,7 +1,5 @@
import os
from typing import Tuple
import yaml
from jinja2 import FileSystemLoader, Environment
from krkn_lib.k8s import KrknKubernetes
from krkn.scenario_plugins.network_chaos_ng.models import NetworkFilterConfig
@@ -10,7 +8,7 @@ from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
def generate_rules(
interfaces: list[str], config: NetworkFilterConfig
) -> (list[str], list[str]):
) -> Tuple[list[str], list[str]]:
input_rules = []
output_rules = []
for interface in interfaces:
@@ -29,72 +27,6 @@ def generate_rules(
return input_rules, output_rules
def generate_namespaced_rules(
interfaces: list[str], config: NetworkFilterConfig, pids: list[str]
) -> (list[str], list[str]):
namespaced_input_rules: list[str] = []
namespaced_output_rules: list[str] = []
input_rules, output_rules = generate_rules(interfaces, config)
for pid in pids:
ns_input_rules = [
f"nsenter --target {pid} --net -- {rule}" for rule in input_rules
]
ns_output_rules = [
f"nsenter --target {pid} --net -- {rule}" for rule in output_rules
]
namespaced_input_rules.extend(ns_input_rules)
namespaced_output_rules.extend(ns_output_rules)
return namespaced_input_rules, namespaced_output_rules
def deploy_network_filter_pod(
config: NetworkFilterConfig,
target_node: str,
pod_name: str,
kubecli: KrknKubernetes,
container_name: str = "fedora",
host_network: bool = True,
):
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
env = Environment(loader=file_loader, autoescape=True)
pod_template = env.get_template("templates/network-chaos.j2")
tolerations = []
for taint in config.taints:
key_value_part, effect = taint.split(":", 1)
if "=" in key_value_part:
key, value = key_value_part.split("=", 1)
operator = "Equal"
else:
key = key_value_part
value = None
operator = "Exists"
toleration = {
"key": key,
"operator": operator,
"effect": effect,
}
if value is not None:
toleration["value"] = value
tolerations.append(toleration)
pod_body = yaml.safe_load(
pod_template.render(
pod_name=pod_name,
namespace=config.namespace,
host_network=host_network,
target=target_node,
container_name=container_name,
workload_image=config.image,
taints=tolerations,
service_account=config.service_account,
)
)
kubecli.create_pod(pod_body, config.namespace, 300)
def apply_network_rules(
kubecli: KrknKubernetes,
input_rules: list[str],
@@ -153,9 +85,20 @@ def clean_network_rules_namespaced(
)
def get_default_interface(
pod_name: str, namespace: str, kubecli: KrknKubernetes
) -> str:
cmd = "ip r | grep default | awk '/default/ {print $5}'"
output = kubecli.exec_cmd_in_pod([cmd], pod_name, namespace)
return output.replace("\n", "")
def generate_namespaced_rules(
interfaces: list[str], config: NetworkFilterConfig, pids: list[str]
) -> Tuple[list[str], list[str]]:
namespaced_input_rules: list[str] = []
namespaced_output_rules: list[str] = []
input_rules, output_rules = generate_rules(interfaces, config)
for pid in pids:
ns_input_rules = [
f"nsenter --target {pid} --net -- {rule}" for rule in input_rules
]
ns_output_rules = [
f"nsenter --target {pid} --net -- {rule}" for rule in output_rules
]
namespaced_input_rules.extend(ns_input_rules)
namespaced_output_rules.extend(ns_output_rules)
return namespaced_input_rules, namespaced_output_rules

View File

@@ -1,17 +1,31 @@
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.network_chaos_ng.models import NetworkFilterConfig
from krkn.scenario_plugins.network_chaos_ng.models import (
NetworkFilterConfig,
NetworkChaosConfig,
)
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos import (
NodeNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.node_network_filter import (
NodeNetworkFilterModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos import (
PodNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.pod_network_filter import (
PodNetworkFilterModule,
)
supported_modules = ["node_network_filter", "pod_network_filter"]
supported_modules = [
"node_network_filter",
"pod_network_filter",
"pod_network_chaos",
"node_network_chaos",
]
class NetworkChaosFactory:
@@ -26,14 +40,28 @@ class NetworkChaosFactory:
raise Exception(f"{config['id']} is not a supported network chaos module")
if config["id"] == "node_network_filter":
config = NetworkFilterConfig(**config)
errors = config.validate()
scenario_config = NetworkFilterConfig(**config)
errors = scenario_config.validate()
if len(errors) > 0:
raise Exception(f"config validation errors: [{';'.join(errors)}]")
return NodeNetworkFilterModule(config, kubecli)
return NodeNetworkFilterModule(scenario_config, kubecli)
if config["id"] == "pod_network_filter":
config = NetworkFilterConfig(**config)
errors = config.validate()
scenario_config = NetworkFilterConfig(**config)
errors = scenario_config.validate()
if len(errors) > 0:
raise Exception(f"config validation errors: [{';'.join(errors)}]")
return PodNetworkFilterModule(config, kubecli)
return PodNetworkFilterModule(scenario_config, kubecli)
if config["id"] == "pod_network_chaos":
scenario_config = NetworkChaosConfig(**config)
errors = scenario_config.validate()
if len(errors) > 0:
raise Exception(f"config validation errors: [{';'.join(errors)}]")
return PodNetworkChaosModule(scenario_config, kubecli)
if config["id"] == "node_network_chaos":
scenario_config = NetworkChaosConfig(**config)
errors = scenario_config.validate()
if len(errors) > 0:
raise Exception(f"config validation errors: [{';'.join(errors)}]")
return NodeNetworkChaosModule(scenario_config, kubecli)
else:
raise Exception(f"invalid network chaos id {config['id']}")

View File

@@ -11,7 +11,7 @@ def get_node_by_name(node_name_list, kubecli: KrknKubernetes):
for node_name in node_name_list:
if node_name not in killable_nodes:
logging.info(
f"Node with provided ${node_name} does not exist or the node might "
f"Node with provided {node_name} does not exist or the node might "
"be in NotReady state."
)
return

View File

@@ -196,13 +196,11 @@ class NodeActionsScenarioPlugin(AbstractScenarioPlugin):
exclude_nodes = common_node_functions.get_node(
exclude_label, 0, kubecli
)
for node in nodes:
if node in exclude_nodes:
logging.info(
f"excluding node {node} with exclude label {exclude_nodes}"
)
nodes.remove(node)
if exclude_nodes:
logging.info(
f"excluding nodes {exclude_nodes} with exclude label {exclude_label}"
)
nodes = [node for node in nodes if node not in exclude_nodes]
# GCP api doesn't support multiprocessing calls, will only actually run 1
if parallel_nodes:

View File

@@ -1,7 +1,7 @@
import importlib
import inspect
import pkgutil
from typing import Type, Tuple, Optional
from typing import Type, Tuple, Optional, Any
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
@@ -11,7 +11,7 @@ class ScenarioPluginNotFound(Exception):
class ScenarioPluginFactory:
loaded_plugins: dict[str, any] = {}
loaded_plugins: dict[str, Any] = {}
failed_plugins: list[Tuple[str, str, str]] = []
package_name = None

View File

@@ -77,7 +77,7 @@ class HealthChecker:
success_response = {
"url": url,
"status": True,
"status_code": response["status_code"],
"status_code": health_check_tracker[url]["status_code"],
"start_timestamp": health_check_tracker[url]["start_timestamp"].isoformat(),
"end_timestamp": health_check_end_time_stamp.isoformat(),
"duration": duration

View File

@@ -49,7 +49,11 @@ class VirtChecker:
for vmi in self.kube_vm_plugin.vmis_list:
node_name = vmi.get("status",{}).get("nodeName")
vmi_name = vmi.get("metadata",{}).get("name")
ip_address = vmi.get("status",{}).get("interfaces",[])[0].get("ipAddress")
interfaces = vmi.get("status",{}).get("interfaces",[])
if not interfaces:
logging.warning(f"VMI {vmi_name} has no network interfaces, skipping")
continue
ip_address = interfaces[0].get("ipAddress")
namespace = vmi.get("metadata",{}).get("namespace")
# If node_name_list exists, only add if node name is in list
@@ -74,7 +78,8 @@ class VirtChecker:
else:
logging.debug(f"Disconnected access for {ip_address} on {worker_name} is failed: {output}")
vmi = self.kube_vm_plugin.get_vmi(vmi_name,self.namespace)
new_ip_address = vmi.get("status",{}).get("interfaces",[])[0].get("ipAddress")
interfaces = vmi.get("status",{}).get("interfaces",[])
new_ip_address = interfaces[0].get("ipAddress") if interfaces else None
new_node_name = vmi.get("status",{}).get("nodeName")
# if vm gets deleted, it'll start up with a new ip address
if new_ip_address != ip_address:
@@ -102,7 +107,7 @@ class VirtChecker:
def get_vm_access(self, vm_name: str = '', namespace: str = ''):
"""
This method returns True when the VM is access and an error message when it is not, using virtctl protocol
This method returns True when the VM is accessible and an error message when it is not, using virtctl protocol
:param vm_name:
:param namespace:
:return: virtctl_status 'True' if successful, or an error message if it fails.

View File

@@ -1,23 +1,23 @@
aliyun-python-sdk-core==2.13.36
aliyun-python-sdk-ecs==4.24.25
arcaflow-plugin-sdk==0.14.0
boto3==1.28.61
boto3>=1.34.0 # Updated to support urllib3 2.x
azure-identity==1.16.1
azure-keyvault==4.2.0
azure-mgmt-compute==30.5.0
azure-mgmt-network==27.0.0
coverage==7.6.12
datetime==5.4
docker>=6.0,<7.0 # docker 7.0+ has breaking changes with Unix sockets
docker>=6.0,<7.0 # docker 7.0+ has breaking changes; works with requests<2.32
gitpython==3.1.41
google-auth==2.37.0
google-cloud-compute==1.22.0
ibm_cloud_sdk_core==3.18.0
ibm_vpc==0.20.0
ibm_cloud_sdk_core>=3.20.0 # Requires urllib3>=2.1.0 (compatible with updated boto3)
ibm_vpc==0.26.3 # Requires ibm_cloud_sdk_core
jinja2==3.1.6
krkn-lib==6.0.1
lxml==5.1.0
kubernetes==34.1.0
krkn-lib==6.0.3
numpy==1.26.4
pandas==2.2.0
openshift-client==1.0.21
@@ -29,11 +29,13 @@ python-ipmi==0.5.4
python-openstackclient==6.5.0
requests<2.32 # requests 2.32+ breaks Unix socket support (http+docker scheme)
requests-unixsocket>=0.4.0 # Required for Docker Unix socket support
urllib3>=2.1.0,<2.4.0 # Compatible with all dependencies
service_identity==24.1.0
PyYAML==6.0.1
setuptools==78.1.1
wheel>=0.44.0
zope.interface==6.1
colorlog==6.10.1
git+https://github.com/vmware/vsphere-automation-sdk-python.git@v8.0.0.0
cryptography>=42.0.4 # not directly required, pinned by Snyk to avoid a vulnerability

View File

@@ -6,6 +6,7 @@ import sys
import yaml
import logging
import optparse
from colorlog import ColoredFormatter
import pyfiglet
import uuid
import time
@@ -652,18 +653,30 @@ if __name__ == "__main__":
# If no command or regular execution, continue with existing logic
report_file = options.output
tee_handler = TeeLogHandler()
fmt = "%(asctime)s [%(levelname)s] %(message)s"
plain = logging.Formatter(fmt)
colored = ColoredFormatter(
"%(asctime)s [%(log_color)s%(levelname)s%(reset)s] %(message)s",
log_colors={'DEBUG': 'white', 'INFO': 'white', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red'},
reset=True, style='%'
)
file_handler = logging.FileHandler(report_file, mode="w")
file_handler.setFormatter(plain)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(colored)
tee_handler.setFormatter(plain)
error_collection_handler = ErrorCollectionHandler(level=logging.ERROR)
handlers = [
logging.FileHandler(report_file, mode="w"),
logging.StreamHandler(),
file_handler,
stream_handler,
tee_handler,
error_collection_handler,
]
logging.basicConfig(
level=logging.DEBUG if options.debug else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=handlers,
)
option_error = False

View File

@@ -0,0 +1,6 @@
- id: kill-pods
config:
namespace_pattern: "local-path-storage"
label_selector: "app=local-path-provisioner"
krkn_pod_recovery_time: 20
kill: 1

View File

@@ -0,0 +1,18 @@
- id: node_network_chaos
image: "quay.io/krkn-chaos/krkn-network-chaos:latest"
wait_duration: 1
test_duration: 60
label_selector: ""
service_account: ""
taints: []
namespace: 'default'
instance_count: 1
target: "<node_name>"
execution: parallel
interfaces: []
ingress: true
egress: true
latency: 0s # supported units are us (microseconds), ms, s
loss: 10 # percentage
bandwidth: 1gbit #supported units are bit kbit mbit gbit tbit
force: false

View File

@@ -4,7 +4,7 @@
test_duration: 10
label_selector: "<node_selector>"
service_account: ""
taints: [] # example ["node-role.kubernetes.io/master:NoSchedule"]
taints: []
namespace: 'default'
instance_count: 1
execution: parallel

View File

@@ -0,0 +1,17 @@
- id: pod_network_chaos
image: "quay.io/krkn-chaos/krkn-network-chaos:latest"
wait_duration: 1
test_duration: 60
label_selector: ""
service_account: ""
taints: []
namespace: 'default'
instance_count: 1
target: "<pod_name>"
execution: parallel
interfaces: []
ingress: true
egress: true
latency: 0s # supported units are us (microseconds), ms, s
loss: 10 # percentage
bandwidth: 1gbit #supported units are bit kbit mbit gbit tbit

View File

@@ -4,7 +4,7 @@
test_duration: 60
label_selector: "<pod_selector>"
service_account: ""
taints: [] # example ["node-role.kubernetes.io/master:NoSchedule"]
taints: []
namespace: 'default'
instance_count: 1
execution: parallel

View File

@@ -743,6 +743,45 @@ class TestNodeActionsScenarioPlugin(unittest.TestCase):
mock_logging.assert_called()
self.assertIn("Error on pool multiprocessing", str(mock_logging.call_args))
@patch('krkn.scenario_plugins.node_actions.node_actions_scenario_plugin.common_node_functions')
def test_inject_node_scenario_excludes_consecutive_nodes(self, mock_common_funcs):
"""
Regression test for issue #1058 - node exclusion was skipping nodes
when consecutive nodes appeared in the exclude list.
"""
node_scenario = {
"label_selector": "node-role.kubernetes.io/worker",
"exclude_label": "node-role.kubernetes.io/infra",
"instance_count": 4
}
action = "node_stop_scenario"
mock_scenario_object = Mock()
mock_scenario_object.affected_nodes_status = AffectedNodeStatus()
mock_scenario_object.affected_nodes_status.affected_nodes = []
# 4 worker nodes, exclude node-A and node-B (consecutive in list)
mock_common_funcs.get_node.side_effect = [
["node-A", "node-B", "node-C", "node-D"],
["node-A", "node-B"]
]
self.plugin.inject_node_scenario(
action,
node_scenario,
mock_scenario_object,
self.mock_kubecli,
self.mock_scenario_telemetry
)
# only node-C and node-D should be processed
self.assertEqual(mock_scenario_object.node_stop_scenario.call_count, 2)
calls = mock_scenario_object.node_stop_scenario.call_args_list
processed_nodes = [call[0][1] for call in calls]
self.assertIn("node-C", processed_nodes)
self.assertIn("node-D", processed_nodes)
self.assertNotIn("node-A", processed_nodes)
self.assertNotIn("node-B", processed_nodes)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,492 @@
#!/usr/bin/env python3
"""
Test suite for NodeNetworkChaosModule class
Usage:
python -m coverage run -a -m unittest tests/test_node_network_chaos.py -v
Assisted By: Claude Code
"""
import unittest
import queue
from unittest.mock import MagicMock, patch, call
from krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos import (
NodeNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.models import (
NetworkChaosConfig,
NetworkChaosScenarioType,
)
class TestNodeNetworkChaosModule(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for NodeNetworkChaosModule
"""
self.mock_kubecli = MagicMock()
self.mock_kubernetes = MagicMock()
self.mock_kubecli.get_lib_kubernetes.return_value = self.mock_kubernetes
self.config = NetworkChaosConfig(
id="test-node-network-chaos",
image="test-image",
wait_duration=1,
test_duration=30,
label_selector="",
service_account="",
taints=[],
namespace="default",
instance_count=1,
target="worker-1",
execution="parallel",
interfaces=["eth0"],
ingress=True,
egress=True,
latency="100ms",
loss="10",
bandwidth="100mbit",
force=False,
)
self.module = NodeNetworkChaosModule(self.config, self.mock_kubecli)
def test_initialization(self):
"""
Test NodeNetworkChaosModule initialization
"""
self.assertEqual(self.module.config, self.config)
self.assertEqual(self.module.kubecli, self.mock_kubecli)
self.assertEqual(self.module.base_network_config, self.config)
def test_get_config(self):
"""
Test get_config returns correct scenario type and config
"""
scenario_type, config = self.module.get_config()
self.assertEqual(scenario_type, NetworkChaosScenarioType.Node)
self.assertEqual(config, self.config)
def test_get_targets_with_target_name(self):
"""
Test get_targets with specific node target name
"""
self.config.label_selector = ""
self.config.target = "worker-1"
self.mock_kubernetes.list_nodes.return_value = ["worker-1", "worker-2"]
targets = self.module.get_targets()
self.assertEqual(targets, ["worker-1"])
def test_get_targets_with_label_selector(self):
"""
Test get_targets with label selector
"""
self.config.label_selector = "node-role.kubernetes.io/worker="
self.mock_kubernetes.list_nodes.return_value = ["worker-1", "worker-2"]
targets = self.module.get_targets()
self.assertEqual(targets, ["worker-1", "worker-2"])
self.mock_kubernetes.list_nodes.assert_called_once_with(
"node-role.kubernetes.io/worker="
)
def test_get_targets_node_not_found(self):
"""
Test get_targets raises exception when node doesn't exist
"""
self.config.label_selector = ""
self.config.target = "non-existent-node"
self.mock_kubernetes.list_nodes.return_value = ["worker-1", "worker-2"]
with self.assertRaises(Exception) as context:
self.module.get_targets()
self.assertIn("not found", str(context.exception))
def test_get_targets_no_target_or_selector(self):
"""
Test get_targets raises exception when neither target nor selector specified
"""
self.config.label_selector = ""
self.config.target = ""
with self.assertRaises(Exception) as context:
self.module.get_targets()
self.assertIn("neither", str(context.exception))
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.node_qdisc_is_simple"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.time.sleep")
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.common_delete_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.common_set_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_info")
def test_run_success(
self,
mock_log_info,
mock_setup,
mock_set_rules,
mock_delete_rules,
mock_sleep,
mock_qdisc_is_simple,
):
"""
Test successful run of node network chaos
"""
# Mock setup returns container_ids and interfaces
mock_setup.return_value = (["container-123"], ["eth0"])
# Mock qdisc check - simple qdisc
mock_qdisc_is_simple.return_value = True
self.module.run("worker-1")
# Verify setup was called with node name
mock_setup.assert_called_once()
setup_args = mock_setup.call_args[0]
# Node name should be passed as target and is_node=True (8th arg, index 7)
self.assertEqual(setup_args[7], True) # is_node flag
# Verify qdisc was checked
mock_qdisc_is_simple.assert_called_once()
# Verify tc rules were set (with pids=None for node scenario)
mock_set_rules.assert_called_once()
set_call_args = mock_set_rules.call_args
# pids should be None (last argument)
self.assertIsNone(set_call_args[0][-1])
# Verify sleep for test duration
mock_sleep.assert_called_once_with(30)
# Verify tc rules were deleted
mock_delete_rules.assert_called_once()
delete_call_args = mock_delete_rules.call_args
# pids should be None (7th argument, index 6)
self.assertIsNone(delete_call_args[0][6])
# Verify cleanup pod was deleted
self.assertEqual(self.mock_kubernetes.delete_pod.call_count, 1)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.node_qdisc_is_simple"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_error")
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_info")
def test_run_no_interfaces_detected(
self, mock_log_info, mock_log_error, mock_setup, mock_qdisc_is_simple
):
"""
Test run handles case when no network interfaces detected
"""
# Mock setup returns empty interfaces
mock_setup.return_value = (["container-123"], [])
# Set config to auto-detect interfaces
self.config.interfaces = []
self.module.run("worker-1")
# Verify error was logged
mock_log_error.assert_called()
self.assertIn("no network interface", str(mock_log_error.call_args))
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.node_qdisc_is_simple"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.setup_network_chaos_ng_scenario"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_warning"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_info")
def test_run_complex_qdisc_without_force(
self, mock_log_info, mock_log_warning, mock_setup, mock_qdisc_is_simple
):
"""
Test run skips chaos when complex qdisc exists and force=False
"""
# Mock setup
mock_setup.return_value = (["container-123"], ["eth0"])
# Mock qdisc check - complex qdisc
mock_qdisc_is_simple.return_value = False
# force is False
self.config.force = False
self.module.run("worker-1")
# Verify warning was logged
mock_log_warning.assert_called()
self.assertIn("already has tc rules", str(mock_log_warning.call_args))
# Verify cleanup pod was still deleted
self.assertEqual(self.mock_kubernetes.delete_pod.call_count, 1)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.node_qdisc_is_simple"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.time.sleep")
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.common_delete_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.common_set_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.setup_network_chaos_ng_scenario"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_warning"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_info")
def test_run_complex_qdisc_with_force(
self,
mock_log_info,
mock_log_warning,
mock_setup,
mock_set_rules,
mock_delete_rules,
mock_sleep,
mock_qdisc_is_simple,
):
"""
Test run proceeds with chaos when complex qdisc exists and force=True
"""
# Mock setup
mock_setup.return_value = (["container-123"], ["eth0"])
# Mock qdisc check - complex qdisc
mock_qdisc_is_simple.return_value = False
# force is True
self.config.force = True
self.module.run("worker-1")
# Verify warning was logged about forcing
mock_log_warning.assert_called()
self.assertIn("forcing", str(mock_log_warning.call_args))
# Verify sleep for safety warning (10 seconds)
sleep_calls = [call[0][0] for call in mock_sleep.call_args_list]
self.assertIn(10, sleep_calls)
# Verify tc rules were set
mock_set_rules.assert_called_once()
# Verify tc rules were deleted
mock_delete_rules.assert_called_once()
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.node_qdisc_is_simple"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.time.sleep")
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.common_delete_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.common_set_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_info")
def test_run_uses_configured_interfaces(
self,
mock_log_info,
mock_setup,
mock_set_rules,
mock_delete_rules,
mock_sleep,
mock_qdisc_is_simple,
):
"""
Test run uses configured interfaces instead of detected ones
"""
# Mock setup returns different interfaces
mock_setup.return_value = (["container-123"], ["eth0", "eth1"])
# Mock qdisc check - simple qdisc
mock_qdisc_is_simple.return_value = True
# Set specific interfaces in config
self.config.interfaces = ["eth2"]
self.module.run("worker-1")
# Verify set_rules was called with configured interfaces
call_args = mock_set_rules.call_args
# interfaces is the 3rd positional argument (index 2)
self.assertEqual(call_args[0][2], ["eth2"])
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.node_qdisc_is_simple"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_info")
def test_run_with_error_queue(
self, mock_log_info, mock_setup, mock_qdisc_is_simple
):
"""
Test run with error_queue for parallel execution
"""
# Mock setup to raise exception
mock_setup.side_effect = Exception("Test error")
error_queue = queue.Queue()
self.module.run("worker-1", error_queue)
# Verify error was put in queue instead of raising
self.assertFalse(error_queue.empty())
error = error_queue.get()
self.assertEqual(error, "Test error")
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.node_qdisc_is_simple"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.time.sleep")
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.common_delete_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.common_set_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_info")
def test_run_ingress_egress_flags(
self,
mock_log_info,
mock_setup,
mock_set_rules,
mock_delete_rules,
mock_sleep,
mock_qdisc_is_simple,
):
"""
Test run passes ingress and egress flags correctly
"""
# Mock setup
mock_setup.return_value = (["container-123"], ["eth0"])
# Mock qdisc check
mock_qdisc_is_simple.return_value = True
# Set specific ingress/egress config
self.config.ingress = False
self.config.egress = True
self.module.run("worker-1")
# Verify set_rules was called with correct egress/ingress flags
set_call_args = mock_set_rules.call_args
# egress is 1st arg (index 0), ingress is 2nd arg (index 1)
self.assertEqual(set_call_args[0][0], True) # egress
self.assertEqual(set_call_args[0][1], False) # ingress
# Verify delete_rules was called with correct flags
delete_call_args = mock_delete_rules.call_args
self.assertEqual(delete_call_args[0][0], True) # egress
self.assertEqual(delete_call_args[0][1], False) # ingress
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.node_qdisc_is_simple"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.setup_network_chaos_ng_scenario"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_warning"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_info")
def test_run_mixed_simple_and_complex_qdisc(
self, mock_log_info, mock_log_warning, mock_setup, mock_qdisc_is_simple
):
"""
Test run with multiple interfaces where some have complex qdisc
"""
# Mock setup with multiple interfaces
mock_setup.return_value = (["container-123"], ["eth0", "eth1"])
# Set config to use detected interfaces
self.config.interfaces = []
self.config.force = False
# Mock qdisc check - eth0 simple, eth1 complex
mock_qdisc_is_simple.side_effect = [True, False]
self.module.run("worker-1")
# Verify warning about complex qdisc on eth1
mock_log_warning.assert_called()
warning_message = str(mock_log_warning.call_args)
self.assertIn("already has tc rules", warning_message)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.node_qdisc_is_simple"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.time.sleep")
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.common_delete_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.common_set_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.node_network_chaos.log_info")
def test_run_checks_qdisc_for_all_interfaces(
self,
mock_log_info,
mock_setup,
mock_set_rules,
mock_delete_rules,
mock_sleep,
mock_qdisc_is_simple,
):
"""
Test run checks qdisc for all interfaces
"""
# Mock setup with multiple interfaces
mock_setup.return_value = (["container-123"], ["eth0", "eth1", "eth2"])
# Set config to use detected interfaces
self.config.interfaces = []
# All interfaces simple
mock_qdisc_is_simple.return_value = True
self.module.run("worker-1")
# Verify qdisc was checked for all 3 interfaces
self.assertEqual(mock_qdisc_is_simple.call_count, 3)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,451 @@
#!/usr/bin/env python3
"""
Test suite for PodNetworkChaosModule class
Usage:
python -m coverage run -a -m unittest tests/test_pod_network_chaos.py -v
Assisted By: Claude Code
"""
import unittest
import queue
from unittest.mock import MagicMock, patch, call
from dataclasses import dataclass
from krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos import (
PodNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.models import (
NetworkChaosConfig,
NetworkChaosScenarioType,
)
class TestPodNetworkChaosModule(unittest.TestCase):
def setUp(self):
"""
Set up test fixtures for PodNetworkChaosModule
"""
self.mock_kubecli = MagicMock()
self.mock_kubernetes = MagicMock()
self.mock_kubecli.get_lib_kubernetes.return_value = self.mock_kubernetes
self.config = NetworkChaosConfig(
id="test-pod-network-chaos",
image="test-image",
wait_duration=1,
test_duration=30,
label_selector="",
service_account="",
taints=[],
namespace="default",
instance_count=1,
target="test-pod",
execution="parallel",
interfaces=["eth0"],
ingress=True,
egress=True,
latency="100ms",
loss="10",
bandwidth="100mbit",
)
self.module = PodNetworkChaosModule(self.config, self.mock_kubecli)
def test_initialization(self):
"""
Test PodNetworkChaosModule initialization
"""
self.assertEqual(self.module.config, self.config)
self.assertEqual(self.module.kubecli, self.mock_kubecli)
self.assertEqual(self.module.base_network_config, self.config)
def test_get_config(self):
"""
Test get_config returns correct scenario type and config
"""
scenario_type, config = self.module.get_config()
self.assertEqual(scenario_type, NetworkChaosScenarioType.Pod)
self.assertEqual(config, self.config)
def test_get_targets_with_target_name(self):
"""
Test get_targets with specific pod target name
"""
self.config.label_selector = ""
self.config.target = "test-pod"
self.mock_kubernetes.check_if_pod_exists.return_value = True
targets = self.module.get_targets()
self.assertEqual(targets, ["test-pod"])
self.mock_kubernetes.check_if_pod_exists.assert_called_once_with(
"test-pod", "default"
)
def test_get_targets_with_label_selector(self):
"""
Test get_targets with label selector
"""
self.config.label_selector = "app=nginx"
self.mock_kubernetes.list_pods.return_value = ["pod1", "pod2", "pod3"]
targets = self.module.get_targets()
self.assertEqual(targets, ["pod1", "pod2", "pod3"])
self.mock_kubernetes.list_pods.assert_called_once_with(
"default", "app=nginx"
)
def test_get_targets_pod_not_found(self):
"""
Test get_targets raises exception when pod doesn't exist
"""
self.config.label_selector = ""
self.config.target = "non-existent-pod"
self.mock_kubernetes.check_if_pod_exists.return_value = False
with self.assertRaises(Exception) as context:
self.module.get_targets()
self.assertIn("not found", str(context.exception))
def test_get_targets_no_namespace(self):
"""
Test get_targets raises exception when namespace not specified
"""
self.config.namespace = None
with self.assertRaises(Exception) as context:
self.module.get_targets()
self.assertIn("namespace not specified", str(context.exception))
def test_get_targets_no_target_or_selector(self):
"""
Test get_targets raises exception when neither target nor selector specified
"""
self.config.label_selector = ""
self.config.target = ""
with self.assertRaises(Exception) as context:
self.module.get_targets()
self.assertIn("neither", str(context.exception))
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.time.sleep")
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.common_delete_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.common_set_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.log_info")
def test_run_success(
self,
mock_log_info,
mock_setup,
mock_set_rules,
mock_delete_rules,
mock_sleep,
):
"""
Test successful run of pod network chaos
"""
# Mock pod info
mock_pod_info = MagicMock()
mock_pod_info.nodeName = "worker-1"
self.mock_kubernetes.get_pod_info.return_value = mock_pod_info
# Mock setup returns container_ids and interfaces
mock_setup.return_value = (["container-123"], ["eth0"])
# Mock get_pod_pids
self.mock_kubernetes.get_pod_pids.return_value = ["1234"]
self.module.run("test-pod")
# Verify pod info was retrieved
self.mock_kubernetes.get_pod_info.assert_called_once_with(
"test-pod", "default"
)
# Verify setup was called
mock_setup.assert_called_once()
# Verify pids were resolved
self.mock_kubernetes.get_pod_pids.assert_called_once()
# Verify tc rules were set
mock_set_rules.assert_called_once()
# Verify sleep for test duration
mock_sleep.assert_called_once_with(30)
# Verify tc rules were deleted
mock_delete_rules.assert_called_once()
# Verify cleanup pod was deleted
self.assertEqual(self.mock_kubernetes.delete_pod.call_count, 1)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.log_info")
def test_run_pod_info_not_found(self, mock_log_info, mock_setup):
"""
Test run raises exception when pod info cannot be retrieved
"""
self.mock_kubernetes.get_pod_info.return_value = None
with self.assertRaises(Exception) as context:
self.module.run("test-pod")
self.assertIn("impossible to retrieve infos", str(context.exception))
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.log_error")
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.log_info")
def test_run_no_interfaces_detected(
self, mock_log_info, mock_log_error, mock_setup
):
"""
Test run handles case when no network interfaces detected
"""
# Mock pod info
mock_pod_info = MagicMock()
mock_pod_info.nodeName = "worker-1"
self.mock_kubernetes.get_pod_info.return_value = mock_pod_info
# Mock setup returns empty interfaces
mock_setup.return_value = (["container-123"], [])
# Set config to auto-detect interfaces
self.config.interfaces = []
self.module.run("test-pod")
# Verify error was logged
mock_log_error.assert_called()
self.assertIn("no network interface", str(mock_log_error.call_args))
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.log_info")
def test_run_no_container_id(self, mock_log_info, mock_setup):
"""
Test run raises exception when container id cannot be resolved
"""
# Mock pod info
mock_pod_info = MagicMock()
mock_pod_info.nodeName = "worker-1"
self.mock_kubernetes.get_pod_info.return_value = mock_pod_info
# Mock setup returns empty container_ids
mock_setup.return_value = ([], ["eth0"])
with self.assertRaises(Exception) as context:
self.module.run("test-pod")
self.assertIn("impossible to resolve container id", str(context.exception))
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.log_info")
def test_run_no_pids(self, mock_log_info, mock_setup):
"""
Test run raises exception when pids cannot be resolved
"""
# Mock pod info
mock_pod_info = MagicMock()
mock_pod_info.nodeName = "worker-1"
self.mock_kubernetes.get_pod_info.return_value = mock_pod_info
# Mock setup
mock_setup.return_value = (["container-123"], ["eth0"])
# Mock get_pod_pids returns empty
self.mock_kubernetes.get_pod_pids.return_value = []
with self.assertRaises(Exception) as context:
self.module.run("test-pod")
self.assertIn("impossible to resolve pid", str(context.exception))
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.time.sleep")
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.common_delete_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.common_set_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.log_info")
def test_run_uses_configured_interfaces(
self,
mock_log_info,
mock_setup,
mock_set_rules,
mock_delete_rules,
mock_sleep,
):
"""
Test run uses configured interfaces instead of detected ones
"""
# Mock pod info
mock_pod_info = MagicMock()
mock_pod_info.nodeName = "worker-1"
self.mock_kubernetes.get_pod_info.return_value = mock_pod_info
# Mock setup returns different interfaces
mock_setup.return_value = (["container-123"], ["eth0", "eth1"])
# Mock get_pod_pids
self.mock_kubernetes.get_pod_pids.return_value = ["1234"]
# Set specific interfaces in config
self.config.interfaces = ["eth2"]
self.module.run("test-pod")
# Verify set_rules was called with configured interfaces, not detected ones
call_args = mock_set_rules.call_args
# interfaces is the 3rd positional argument (index 2)
self.assertEqual(call_args[0][2], ["eth2"])
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.log_info")
def test_run_with_error_queue(self, mock_log_info, mock_setup):
"""
Test run with error_queue for parallel execution
"""
# Mock pod info
mock_pod_info = MagicMock()
mock_pod_info.nodeName = "worker-1"
self.mock_kubernetes.get_pod_info.return_value = mock_pod_info
# Mock setup to raise exception
mock_setup.side_effect = Exception("Test error")
error_queue = queue.Queue()
self.module.run("test-pod", error_queue)
# Verify error was put in queue instead of raising
self.assertFalse(error_queue.empty())
error = error_queue.get()
self.assertEqual(error, "Test error")
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.time.sleep")
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.common_delete_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.common_set_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.log_info")
def test_run_passes_correct_pids(
self,
mock_log_info,
mock_setup,
mock_set_rules,
mock_delete_rules,
mock_sleep,
):
"""
Test run passes pids correctly to set and delete rules
"""
# Mock pod info
mock_pod_info = MagicMock()
mock_pod_info.nodeName = "worker-1"
self.mock_kubernetes.get_pod_info.return_value = mock_pod_info
# Mock setup
mock_setup.return_value = (["container-123"], ["eth0"])
# Mock get_pod_pids
test_pids = ["1234", "5678"]
self.mock_kubernetes.get_pod_pids.return_value = test_pids
self.module.run("test-pod")
# Verify set_rules was called with pids
set_call_args = mock_set_rules.call_args
# pids is the last positional argument
self.assertEqual(set_call_args[0][-1], test_pids)
# Verify delete_rules was called with pids
delete_call_args = mock_delete_rules.call_args
# pids is argument at index 6
self.assertEqual(delete_call_args[0][6], test_pids)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.time.sleep")
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.common_delete_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.common_set_limit_rules"
)
@patch(
"krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.setup_network_chaos_ng_scenario"
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.pod_network_chaos.log_info")
def test_run_ingress_egress_flags(
self,
mock_log_info,
mock_setup,
mock_set_rules,
mock_delete_rules,
mock_sleep,
):
"""
Test run passes ingress and egress flags correctly
"""
# Mock pod info
mock_pod_info = MagicMock()
mock_pod_info.nodeName = "worker-1"
self.mock_kubernetes.get_pod_info.return_value = mock_pod_info
# Mock setup
mock_setup.return_value = (["container-123"], ["eth0"])
# Mock get_pod_pids
self.mock_kubernetes.get_pod_pids.return_value = ["1234"]
# Set specific ingress/egress config
self.config.ingress = False
self.config.egress = True
self.module.run("test-pod")
# Verify set_rules was called with correct egress/ingress flags
set_call_args = mock_set_rules.call_args
# egress is 1st arg (index 0), ingress is 2nd arg (index 1)
self.assertEqual(set_call_args[0][0], True) # egress
self.assertEqual(set_call_args[0][1], False) # ingress
# Verify delete_rules was called with correct flags
delete_call_args = mock_delete_rules.call_args
self.assertEqual(delete_call_args[0][0], True) # egress
self.assertEqual(delete_call_args[0][1], False) # ingress
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,599 @@
#!/usr/bin/env python3
"""
Test suite for utils_network_chaos module
Usage:
python -m coverage run -a -m unittest tests/test_utils_network_chaos.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock, patch, call
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos import (
get_build_tc_tree_commands,
namespaced_tc_commands,
get_egress_shaping_comand,
get_clear_egress_shaping_commands,
get_ingress_shaping_commands,
get_clear_ingress_shaping_commands,
node_qdisc_is_simple,
common_set_limit_rules,
common_delete_limit_rules,
ROOT_HANDLE,
CLASS_ID,
NETEM_HANDLE,
)
class TestBuildTcTreeCommands(unittest.TestCase):
def test_build_tc_tree_single_interface(self):
"""
Test building tc tree commands for a single interface
"""
devices = ["eth0"]
result = get_build_tc_tree_commands(devices)
self.assertEqual(len(result), 3)
self.assertIn("tc qdisc add dev eth0 root handle 100: htb default 1", result)
self.assertIn(
"tc class add dev eth0 parent 100: classid 100:1 htb rate 1gbit", result
)
self.assertIn(
"tc qdisc add dev eth0 parent 100:1 handle 101: netem delay 0ms loss 0%",
result,
)
def test_build_tc_tree_multiple_interfaces(self):
"""
Test building tc tree commands for multiple interfaces
"""
devices = ["eth0", "eth1"]
result = get_build_tc_tree_commands(devices)
self.assertEqual(len(result), 6)
# Verify commands for eth0
self.assertIn("tc qdisc add dev eth0 root handle 100: htb default 1", result)
# Verify commands for eth1
self.assertIn("tc qdisc add dev eth1 root handle 100: htb default 1", result)
def test_build_tc_tree_empty_list(self):
"""
Test building tc tree commands with empty device list
"""
devices = []
result = get_build_tc_tree_commands(devices)
self.assertEqual(len(result), 0)
class TestNamespacedTcCommands(unittest.TestCase):
def test_namespaced_commands_single_pid(self):
"""
Test wrapping commands with nsenter for single pid
"""
pids = ["1234"]
commands = ["tc qdisc add dev eth0 root handle 100: htb"]
result = namespaced_tc_commands(pids, commands)
self.assertEqual(len(result), 1)
self.assertEqual(
result[0],
"nsenter --target 1234 --net -- tc qdisc add dev eth0 root handle 100: htb",
)
def test_namespaced_commands_multiple_pids(self):
"""
Test wrapping commands with nsenter for multiple pids
"""
pids = ["1234", "5678"]
commands = ["tc qdisc add dev eth0 root handle 100: htb"]
result = namespaced_tc_commands(pids, commands)
self.assertEqual(len(result), 2)
self.assertIn(
"nsenter --target 1234 --net -- tc qdisc add dev eth0 root handle 100: htb",
result,
)
self.assertIn(
"nsenter --target 5678 --net -- tc qdisc add dev eth0 root handle 100: htb",
result,
)
def test_namespaced_commands_multiple_pids_and_commands(self):
"""
Test wrapping multiple commands for multiple pids
"""
pids = ["1234", "5678"]
commands = ["tc qdisc add dev eth0 root", "tc class add dev eth0"]
result = namespaced_tc_commands(pids, commands)
self.assertEqual(len(result), 4)
class TestEgressShapingCommands(unittest.TestCase):
def test_egress_shaping_with_all_params(self):
"""
Test egress shaping commands with bandwidth, latency and loss
"""
devices = ["eth0"]
result = get_egress_shaping_comand(devices, "100", "50", "10")
self.assertEqual(len(result), 2)
self.assertIn(
"tc class change dev eth0 parent 100: classid 100:1 htb rate 100mbit",
result,
)
self.assertIn(
"tc qdisc change dev eth0 parent 100:1 handle 101: netem delay 50ms loss 10%",
result,
)
def test_egress_shaping_with_defaults(self):
"""
Test egress shaping commands with None values defaults to 1gbit, 0ms, 0%
"""
devices = ["eth0"]
result = get_egress_shaping_comand(devices, None, None, None)
self.assertEqual(len(result), 2)
self.assertIn(
"tc class change dev eth0 parent 100: classid 100:1 htb rate 1gbit", result
)
self.assertIn(
"tc qdisc change dev eth0 parent 100:1 handle 101: netem delay 0ms loss 0%",
result,
)
def test_egress_shaping_multiple_interfaces(self):
"""
Test egress shaping for multiple interfaces
"""
devices = ["eth0", "eth1"]
result = get_egress_shaping_comand(devices, "100", "50", "10")
self.assertEqual(len(result), 4)
class TestClearEgressShapingCommands(unittest.TestCase):
def test_clear_egress_single_interface(self):
"""
Test clear egress shaping for single interface
"""
devices = ["eth0"]
result = get_clear_egress_shaping_commands(devices)
self.assertEqual(len(result), 1)
self.assertIn("tc qdisc del dev eth0 root handle 100:", result)
def test_clear_egress_multiple_interfaces(self):
"""
Test clear egress shaping for multiple interfaces
"""
devices = ["eth0", "eth1"]
result = get_clear_egress_shaping_commands(devices)
self.assertEqual(len(result), 2)
self.assertIn("tc qdisc del dev eth0 root handle 100:", result)
self.assertIn("tc qdisc del dev eth1 root handle 100:", result)
class TestIngressShapingCommands(unittest.TestCase):
def test_ingress_shaping_with_all_params(self):
"""
Test ingress shaping commands with bandwidth, latency and loss
"""
devices = ["eth0"]
result = get_ingress_shaping_commands(devices, "100", "50ms", "10")
# Should have: modprobe, ip link add, ip link set, tc qdisc add ingress,
# tc filter add, tc qdisc add root, tc class add, tc qdisc add netem
self.assertGreater(len(result), 7)
self.assertIn("modprobe ifb || true", result)
self.assertIn("ip link add ifb0 type ifb || true", result)
self.assertIn("ip link set ifb0 up || true", result)
self.assertIn("tc qdisc add dev eth0 handle ffff: ingress || true", result)
# Check that bandwidth, latency, loss are in commands
self.assertTrue(any("100" in cmd for cmd in result))
self.assertTrue(any("50ms" in cmd for cmd in result))
self.assertTrue(any("10" in cmd for cmd in result))
def test_ingress_shaping_with_defaults(self):
"""
Test ingress shaping with None values uses defaults
"""
devices = ["eth0"]
result = get_ingress_shaping_commands(devices, None, None, None)
self.assertGreater(len(result), 7)
# Should use 1gbit, 0ms, 0% as defaults
self.assertTrue(any("1gbit" in cmd for cmd in result))
self.assertTrue(any("0ms" in cmd for cmd in result))
self.assertTrue(any("0%" in cmd for cmd in result))
def test_ingress_shaping_custom_ifb_device(self):
"""
Test ingress shaping with custom ifb device name
"""
devices = ["eth0"]
result = get_ingress_shaping_commands(devices, "100", "50ms", "10", "ifb1")
self.assertIn("ip link add ifb1 type ifb || true", result)
self.assertIn("ip link set ifb1 up || true", result)
class TestClearIngressShapingCommands(unittest.TestCase):
def test_clear_ingress_single_interface(self):
"""
Test clear ingress shaping for single interface
"""
devices = ["eth0"]
result = get_clear_ingress_shaping_commands(devices)
self.assertGreater(len(result), 3)
self.assertIn("tc qdisc del dev eth0 ingress || true", result)
self.assertIn("tc qdisc del dev ifb0 root handle 100: || true", result)
self.assertIn("ip link set ifb0 down || true", result)
self.assertIn("ip link del ifb0 || true", result)
def test_clear_ingress_multiple_interfaces(self):
"""
Test clear ingress shaping for multiple interfaces
"""
devices = ["eth0", "eth1"]
result = get_clear_ingress_shaping_commands(devices)
self.assertIn("tc qdisc del dev eth0 ingress || true", result)
self.assertIn("tc qdisc del dev eth1 ingress || true", result)
def test_clear_ingress_custom_ifb_device(self):
"""
Test clear ingress with custom ifb device
"""
devices = ["eth0"]
result = get_clear_ingress_shaping_commands(devices, "ifb1")
self.assertIn("tc qdisc del dev ifb1 root handle 100: || true", result)
self.assertIn("ip link set ifb1 down || true", result)
self.assertIn("ip link del ifb1 || true", result)
class TestNodeQdiscIsSimple(unittest.TestCase):
def test_node_qdisc_is_simple_with_simple_qdisc(self):
"""
Test node_qdisc_is_simple returns True for simple qdisc (e.g., pfifo_fast)
"""
mock_kubecli = MagicMock()
mock_kubecli.exec_cmd_in_pod.return_value = (
"qdisc pfifo_fast 0: root refcnt 2 bands 3 priomap 1 2 2 2"
)
result = node_qdisc_is_simple(mock_kubecli, "test-pod", "default", "eth0")
self.assertTrue(result)
mock_kubecli.exec_cmd_in_pod.assert_called_once_with(
["tc qdisc show dev eth0"], "test-pod", "default"
)
def test_node_qdisc_is_simple_with_htb(self):
"""
Test node_qdisc_is_simple returns False for htb qdisc
"""
mock_kubecli = MagicMock()
mock_kubecli.exec_cmd_in_pod.return_value = (
"qdisc htb 100: root refcnt 2 r2q 10 default 1"
)
result = node_qdisc_is_simple(mock_kubecli, "test-pod", "default", "eth0")
self.assertFalse(result)
def test_node_qdisc_is_simple_with_netem(self):
"""
Test node_qdisc_is_simple returns False for netem qdisc
"""
mock_kubecli = MagicMock()
mock_kubecli.exec_cmd_in_pod.return_value = (
"qdisc netem 101: root refcnt 2 limit 1000 delay 100ms"
)
result = node_qdisc_is_simple(mock_kubecli, "test-pod", "default", "eth0")
self.assertFalse(result)
def test_node_qdisc_is_simple_with_clsact(self):
"""
Test node_qdisc_is_simple returns False for clsact qdisc
"""
mock_kubecli = MagicMock()
mock_kubecli.exec_cmd_in_pod.return_value = "qdisc clsact ffff: parent ffff:fff1"
result = node_qdisc_is_simple(mock_kubecli, "test-pod", "default", "eth0")
self.assertFalse(result)
def test_node_qdisc_is_simple_with_multiple_lines(self):
"""
Test node_qdisc_is_simple returns False when multiple qdisc lines exist
"""
mock_kubecli = MagicMock()
mock_kubecli.exec_cmd_in_pod.return_value = (
"qdisc pfifo_fast 0: root\nqdisc htb 100: dev eth0"
)
result = node_qdisc_is_simple(mock_kubecli, "test-pod", "default", "eth0")
self.assertFalse(result)
def test_node_qdisc_is_simple_case_insensitive(self):
"""
Test node_qdisc_is_simple check is case insensitive
"""
mock_kubecli = MagicMock()
mock_kubecli.exec_cmd_in_pod.return_value = "qdisc HTB 100: root"
result = node_qdisc_is_simple(mock_kubecli, "test-pod", "default", "eth0")
self.assertFalse(result)
class TestCommonSetLimitRules(unittest.TestCase):
def setUp(self):
"""
Set up mock kubecli for all tests
"""
self.mock_kubecli = MagicMock()
self.mock_kubecli.exec_cmd_in_pod.return_value = ""
@patch("krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos.log_info")
def test_set_egress_only(self, mock_log_info):
"""
Test setting egress rules only
"""
common_set_limit_rules(
egress=True,
ingress=False,
interfaces=["eth0"],
bandwidth="100",
latency="50",
loss="10",
parallel=False,
target="test-target",
kubecli=self.mock_kubecli,
network_chaos_pod_name="chaos-pod",
namespace="default",
pids=None,
)
# Should call exec_cmd_in_pod for egress rules (3 build + 2 shaping)
self.assertGreaterEqual(self.mock_kubecli.exec_cmd_in_pod.call_count, 5)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos.log_info")
def test_set_ingress_only(self, mock_log_info):
"""
Test setting ingress rules only
"""
common_set_limit_rules(
egress=False,
ingress=True,
interfaces=["eth0"],
bandwidth="100",
latency="50",
loss="10",
parallel=False,
target="test-target",
kubecli=self.mock_kubecli,
network_chaos_pod_name="chaos-pod",
namespace="default",
pids=None,
)
# Should call exec_cmd_in_pod for ingress rules
self.assertGreater(self.mock_kubecli.exec_cmd_in_pod.call_count, 0)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos.log_info")
def test_set_both_egress_and_ingress(self, mock_log_info):
"""
Test setting both egress and ingress rules
"""
common_set_limit_rules(
egress=True,
ingress=True,
interfaces=["eth0"],
bandwidth="100",
latency="50",
loss="10",
parallel=False,
target="test-target",
kubecli=self.mock_kubecli,
network_chaos_pod_name="chaos-pod",
namespace="default",
pids=None,
)
# Should call exec_cmd_in_pod for both egress and ingress
self.assertGreater(self.mock_kubecli.exec_cmd_in_pod.call_count, 10)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos.log_info")
def test_set_with_pids(self, mock_log_info):
"""
Test setting rules with pids (namespace mode)
"""
common_set_limit_rules(
egress=True,
ingress=False,
interfaces=["eth0"],
bandwidth="100",
latency="50",
loss="10",
parallel=False,
target="test-target",
kubecli=self.mock_kubecli,
network_chaos_pod_name="chaos-pod",
namespace="default",
pids=["1234"],
)
# Verify that commands include nsenter
calls = self.mock_kubecli.exec_cmd_in_pod.call_args_list
self.assertTrue(
any("nsenter" in str(call) for call in calls),
"Expected nsenter commands when pids are provided",
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos.log_error")
def test_set_with_command_failure(self, mock_log_error):
"""
Test handling of command failures
"""
# Simulate all commands failing
self.mock_kubecli.exec_cmd_in_pod.return_value = "error"
common_set_limit_rules(
egress=True,
ingress=False,
interfaces=["eth0"],
bandwidth="100",
latency="50",
loss="10",
parallel=False,
target="test-target",
kubecli=self.mock_kubecli,
network_chaos_pod_name="chaos-pod",
namespace="default",
pids=None,
)
# Should log error when all commands fail
mock_log_error.assert_called()
class TestCommonDeleteLimitRules(unittest.TestCase):
def setUp(self):
"""
Set up mock kubecli for all tests
"""
self.mock_kubecli = MagicMock()
self.mock_kubecli.exec_cmd_in_pod.return_value = ""
@patch("krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos.log_info")
def test_delete_egress_only(self, mock_log_info):
"""
Test deleting egress rules only
"""
common_delete_limit_rules(
egress=True,
ingress=False,
interfaces=["eth0"],
network_chaos_pod_name="chaos-pod",
network_chaos_namespace="default",
kubecli=self.mock_kubecli,
pids=None,
parallel=False,
target="test-target",
)
# Should call exec_cmd_in_pod for egress cleanup
self.assertGreater(self.mock_kubecli.exec_cmd_in_pod.call_count, 0)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos.log_info")
def test_delete_ingress_only(self, mock_log_info):
"""
Test deleting ingress rules only
"""
common_delete_limit_rules(
egress=False,
ingress=True,
interfaces=["eth0"],
network_chaos_pod_name="chaos-pod",
network_chaos_namespace="default",
kubecli=self.mock_kubecli,
pids=None,
parallel=False,
target="test-target",
)
# Should call exec_cmd_in_pod for ingress cleanup
self.assertGreater(self.mock_kubecli.exec_cmd_in_pod.call_count, 0)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos.log_info")
def test_delete_both_egress_and_ingress(self, mock_log_info):
"""
Test deleting both egress and ingress rules
"""
common_delete_limit_rules(
egress=True,
ingress=True,
interfaces=["eth0"],
network_chaos_pod_name="chaos-pod",
network_chaos_namespace="default",
kubecli=self.mock_kubecli,
pids=None,
parallel=False,
target="test-target",
)
# Should call exec_cmd_in_pod for both egress and ingress
self.assertGreater(self.mock_kubecli.exec_cmd_in_pod.call_count, 3)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos.log_info")
def test_delete_with_pids(self, mock_log_info):
"""
Test deleting rules with pids (namespace mode)
"""
common_delete_limit_rules(
egress=True,
ingress=False,
interfaces=["eth0"],
network_chaos_pod_name="chaos-pod",
network_chaos_namespace="default",
kubecli=self.mock_kubecli,
pids=["1234"],
parallel=False,
target="test-target",
)
# Verify that commands include nsenter
calls = self.mock_kubecli.exec_cmd_in_pod.call_args_list
self.assertTrue(
any("nsenter" in str(call) for call in calls),
"Expected nsenter commands when pids are provided",
)
@patch("krkn.scenario_plugins.network_chaos_ng.modules.utils_network_chaos.log_error")
def test_delete_with_command_failure(self, mock_log_error):
"""
Test handling of command failures during deletion
"""
# Simulate all commands failing
self.mock_kubecli.exec_cmd_in_pod.return_value = "error"
common_delete_limit_rules(
egress=True,
ingress=False,
interfaces=["eth0"],
network_chaos_pod_name="chaos-pod",
network_chaos_namespace="default",
kubecli=self.mock_kubecli,
pids=None,
parallel=False,
target="test-target",
)
# Should log error when all commands fail
mock_log_error.assert_called()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,826 @@
#!/usr/bin/env python3
"""
Test suite for VMWare node scenarios
This test suite covers both the VMWare class and vmware_node_scenarios class
using mocks to avoid actual VMWare CLI calls.
Usage:
python -m coverage run -a -m unittest tests/test_vmware_node_scenarios.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock, patch, PropertyMock
from krkn.scenario_plugins.node_actions.vmware_node_scenarios import vmware_node_scenarios, vSphere
from krkn_lib.models.k8s import AffectedNodeStatus
from com.vmware.vcenter.vm_client import Power
class TestVmwareNodeScenarios(unittest.TestCase):
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def setUp(self, mock_vsphere_class):
# Mock the configuration and dependencies
self.mock_kubecli = MagicMock()
self.mock_affected_nodes_status = AffectedNodeStatus()
self.mock_vsphere = MagicMock()
mock_vsphere_class.return_value = self.mock_vsphere
# Initialize the scenario class
self.vmware_scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=self.mock_affected_nodes_status
)
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_reboot_node_success(self, mock_vsphere_class):
"""Test successful node reboot."""
node_name = "test-node-01"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
mock_vsphere.reboot_instances.return_value = True
# Create a fresh instance with mocked vSphere
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=AffectedNodeStatus()
)
# Execute the reboot scenario
scenarios.node_reboot_scenario(
instance_kill_count=1,
node=node_name,
timeout=300
)
# Assertions
mock_vsphere.reboot_instances.assert_called_with(node_name)
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_node_not_found(self, mock_vsphere_class):
"""Test behavior when the VM does not exist in vCenter."""
node_name = "non-existent-node"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
mock_vsphere.get_vm.return_value = None
mock_vsphere.reboot_instances.side_effect = Exception(f"VM {node_name} not found")
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=AffectedNodeStatus()
)
# This should handle the exception gracefully (just log it)
scenarios.node_reboot_scenario(
instance_kill_count=1,
node=node_name,
timeout=300
)
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_stop_start_node(self, mock_vsphere_class):
"""Test stopping and then starting a node."""
node_name = "test-node-02"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
mock_vsphere.stop_instances.return_value = True
mock_vsphere.start_instances.return_value = True
mock_vsphere.wait_until_stopped.return_value = True
mock_vsphere.wait_until_running.return_value = True
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=AffectedNodeStatus()
)
# Test stop scenario
scenarios.node_stop_scenario(
instance_kill_count=1,
node=node_name,
timeout=300,
poll_interval=5
)
mock_vsphere.stop_instances.assert_called_with(node_name)
# Test start scenario
scenarios.node_start_scenario(
instance_kill_count=1,
node=node_name,
timeout=300,
poll_interval=5
)
mock_vsphere.start_instances.assert_called_with(node_name)
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_vcenter_connection_failure(self, mock_vsphere_class):
"""Test scenario where connection to vCenter fails."""
# Force the vSphere init to raise an exception
mock_vsphere_class.side_effect = Exception("Connection Refused")
with self.assertRaises(Exception):
vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=AffectedNodeStatus()
)
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_node_terminate_scenario(self, mock_vsphere_class):
"""Test node termination scenario."""
node_name = "test-node-terminate"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
mock_vsphere.stop_instances.return_value = True
mock_vsphere.wait_until_stopped.return_value = True
mock_vsphere.wait_until_released.return_value = True
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=AffectedNodeStatus()
)
# Execute terminate scenario
scenarios.node_terminate_scenario(
instance_kill_count=1,
node=node_name,
timeout=300,
poll_interval=5
)
# Verify the sequence of calls
mock_vsphere.stop_instances.assert_called_with(node_name)
mock_vsphere.wait_until_stopped.assert_called_once()
mock_vsphere.release_instances.assert_called_with(node_name)
mock_vsphere.wait_until_released.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_node_already_stopped(self, mock_vsphere_class):
"""Test scenario when node is already in the stopped state."""
node_name = "already-stopped-node"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
# Return False indicating VM is already stopped
mock_vsphere.stop_instances.return_value = False
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=AffectedNodeStatus()
)
scenarios.node_stop_scenario(
instance_kill_count=1,
node=node_name,
timeout=300,
poll_interval=5
)
# Should still call stop_instances but not wait_until_stopped
mock_vsphere.stop_instances.assert_called_with(node_name)
mock_vsphere.wait_until_stopped.assert_not_called()
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_node_already_started(self, mock_vsphere_class):
"""Test scenario when node is already in the running state."""
node_name = "already-running-node"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
# Return False indicating VM is already running
mock_vsphere.start_instances.return_value = False
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=AffectedNodeStatus()
)
scenarios.node_start_scenario(
instance_kill_count=1,
node=node_name,
timeout=300,
poll_interval=5
)
# Should still call start_instances but not wait_until_running
mock_vsphere.start_instances.assert_called_with(node_name)
mock_vsphere.wait_until_running.assert_not_called()
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.nodeaction')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_reboot_with_kube_check(self, mock_vsphere_class, mock_nodeaction):
"""Test reboot scenario with Kubernetes health checks enabled."""
node_name = "test-node-kube-check"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
mock_vsphere.reboot_instances.return_value = True
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=True, # Enable kube checks
affected_nodes_status=AffectedNodeStatus()
)
scenarios.node_reboot_scenario(
instance_kill_count=1,
node=node_name,
timeout=300
)
# Verify kube health check was called
mock_nodeaction.wait_for_unknown_status.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.nodeaction')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_start_with_kube_check(self, mock_vsphere_class, mock_nodeaction):
"""Test start scenario with Kubernetes health checks enabled."""
node_name = "test-node-start-kube"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
mock_vsphere.start_instances.return_value = True
mock_vsphere.wait_until_running.return_value = True
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=True,
affected_nodes_status=AffectedNodeStatus()
)
scenarios.node_start_scenario(
instance_kill_count=1,
node=node_name,
timeout=300,
poll_interval=5
)
# Verify both vSphere and kube checks were called
mock_vsphere.wait_until_running.assert_called_once()
mock_nodeaction.wait_for_ready_status.assert_called_once()
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_multiple_instance_kill_count(self, mock_vsphere_class):
"""Test scenario with multiple instance kill count (loop)."""
node_name = "test-node-multiple"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
mock_vsphere.reboot_instances.return_value = True
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=AffectedNodeStatus()
)
# Test with kill count of 3
scenarios.node_reboot_scenario(
instance_kill_count=3,
node=node_name,
timeout=300
)
# Should be called 3 times
assert mock_vsphere.reboot_instances.call_count == 3
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_stop_failure_exception_handling(self, mock_vsphere_class):
"""Test exception handling during node stop."""
node_name = "failing-node"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
mock_vsphere.stop_instances.side_effect = Exception("vSphere API Error")
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=AffectedNodeStatus()
)
# Should not raise exception, just log it
scenarios.node_stop_scenario(
instance_kill_count=1,
node=node_name,
timeout=300,
poll_interval=5
)
# Verify it attempted to stop
mock_vsphere.stop_instances.assert_called_with(node_name)
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_terminate_failure_exception_handling(self, mock_vsphere_class):
"""Test exception handling during node termination."""
node_name = "terminate-failing-node"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
mock_vsphere.stop_instances.return_value = True
mock_vsphere.wait_until_stopped.return_value = True
mock_vsphere.release_instances.side_effect = Exception("Cannot delete VM")
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=AffectedNodeStatus()
)
# Should not raise exception
scenarios.node_terminate_scenario(
instance_kill_count=1,
node=node_name,
timeout=300,
poll_interval=5
)
# Verify termination was attempted
mock_vsphere.release_instances.assert_called_with(node_name)
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_affected_nodes_tracking(self, mock_vsphere_class):
"""Test that affected nodes are properly tracked."""
node_name = "tracked-node"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
mock_vsphere.reboot_instances.return_value = True
affected_status = AffectedNodeStatus()
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=affected_status
)
# Verify no affected nodes initially
assert len(affected_status.affected_nodes) == 0
scenarios.node_reboot_scenario(
instance_kill_count=1,
node=node_name,
timeout=300
)
# Verify affected node was tracked
assert len(affected_status.affected_nodes) == 1
assert affected_status.affected_nodes[0].node_name == node_name
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.vSphere')
def test_reboot_not_allowed_state(self, mock_vsphere_class):
"""Test reboot when VM is in a state that doesn't allow reboot."""
node_name = "powered-off-node"
mock_vsphere = MagicMock()
mock_vsphere_class.return_value = mock_vsphere
# Return False indicating reboot failed (VM not powered on)
mock_vsphere.reboot_instances.return_value = False
scenarios = vmware_node_scenarios(
kubecli=self.mock_kubecli,
node_action_kube_check=False,
affected_nodes_status=AffectedNodeStatus()
)
scenarios.node_reboot_scenario(
instance_kill_count=1,
node=node_name,
timeout=300
)
# Should attempt reboot
mock_vsphere.reboot_instances.assert_called_with(node_name)
class TestVSphereClass(unittest.TestCase):
"""Test suite for the vSphere class."""
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_vsphere_initialization_success(self, mock_session, mock_create_client):
"""Test successful vSphere client initialization."""
mock_client = MagicMock()
mock_create_client.return_value = mock_client
vsphere = vSphere()
self.assertEqual(vsphere.server, '192.168.1.100')
self.assertEqual(vsphere.username, 'admin')
self.assertEqual(vsphere.password, 'password123')
self.assertTrue(vsphere.credentials_present)
mock_create_client.assert_called_once()
@patch.dict('os.environ', {}, clear=True)
def test_vsphere_initialization_missing_credentials(self):
"""Test vSphere initialization fails when credentials are missing."""
with self.assertRaises(Exception) as context:
vSphere()
self.assertIn("Environmental variables", str(context.exception))
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_get_vm_success(self, mock_session, mock_create_client):
"""Test getting a VM by name."""
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
mock_client.vcenter.VM.list.return_value = [mock_vm_obj]
mock_create_client.return_value = mock_client
vsphere = vSphere()
vm_id = vsphere.get_vm('test-vm')
self.assertEqual(vm_id, 'vm-123')
mock_client.vcenter.VM.list.assert_called_once()
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_get_vm_not_found(self, mock_session, mock_create_client):
"""Test getting a VM that doesn't exist."""
mock_client = MagicMock()
mock_client.vcenter.VM.list.return_value = []
mock_create_client.return_value = mock_client
vsphere = vSphere()
vm_id = vsphere.get_vm('non-existent-vm')
self.assertIsNone(vm_id)
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_reboot_instances_success(self, mock_session, mock_create_client):
"""Test successful VM reboot."""
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
mock_client.vcenter.VM.list.return_value = [mock_vm_obj]
mock_create_client.return_value = mock_client
vsphere = vSphere()
result = vsphere.reboot_instances('test-vm')
self.assertTrue(result)
mock_client.vcenter.vm.Power.reset.assert_called_with('vm-123')
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_reboot_instances_not_powered_on(self, mock_session, mock_create_client):
"""Test reboot fails when VM is not powered on."""
from com.vmware.vapi.std.errors_client import NotAllowedInCurrentState
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
mock_client.vcenter.VM.list.return_value = [mock_vm_obj]
mock_client.vcenter.vm.Power.reset.side_effect = NotAllowedInCurrentState()
mock_create_client.return_value = mock_client
vsphere = vSphere()
result = vsphere.reboot_instances('test-vm')
self.assertFalse(result)
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_stop_instances_success(self, mock_session, mock_create_client):
"""Test successful VM stop."""
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
mock_client.vcenter.VM.list.return_value = [mock_vm_obj]
mock_create_client.return_value = mock_client
vsphere = vSphere()
result = vsphere.stop_instances('test-vm')
self.assertTrue(result)
mock_client.vcenter.vm.Power.stop.assert_called_with('vm-123')
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_stop_instances_already_stopped(self, mock_session, mock_create_client):
"""Test stop when VM is already stopped."""
from com.vmware.vapi.std.errors_client import AlreadyInDesiredState
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
mock_client.vcenter.VM.list.return_value = [mock_vm_obj]
mock_client.vcenter.vm.Power.stop.side_effect = AlreadyInDesiredState()
mock_create_client.return_value = mock_client
vsphere = vSphere()
result = vsphere.stop_instances('test-vm')
self.assertFalse(result)
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_start_instances_success(self, mock_session, mock_create_client):
"""Test successful VM start."""
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
mock_client.vcenter.VM.list.return_value = [mock_vm_obj]
mock_create_client.return_value = mock_client
vsphere = vSphere()
result = vsphere.start_instances('test-vm')
self.assertTrue(result)
mock_client.vcenter.vm.Power.start.assert_called_with('vm-123')
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_start_instances_already_started(self, mock_session, mock_create_client):
"""Test start when VM is already running."""
from com.vmware.vapi.std.errors_client import AlreadyInDesiredState
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
mock_client.vcenter.VM.list.return_value = [mock_vm_obj]
mock_client.vcenter.vm.Power.start.side_effect = AlreadyInDesiredState()
mock_create_client.return_value = mock_client
vsphere = vSphere()
result = vsphere.start_instances('test-vm')
self.assertFalse(result)
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_get_vm_status(self, mock_session, mock_create_client):
"""Test getting VM status."""
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
mock_client.vcenter.VM.list.return_value = [mock_vm_obj]
mock_power_state = MagicMock()
mock_power_state.state = Power.State.POWERED_ON
mock_client.vcenter.vm.Power.get.return_value = mock_power_state
mock_create_client.return_value = mock_client
vsphere = vSphere()
status = vsphere.get_vm_status('test-vm')
self.assertEqual(status, Power.State.POWERED_ON)
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_get_vm_status_exception(self, mock_session, mock_create_client):
"""Test get_vm_status handles exceptions gracefully."""
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
mock_client.vcenter.VM.list.return_value = [mock_vm_obj]
mock_client.vcenter.vm.Power.get.side_effect = Exception("API Error")
mock_create_client.return_value = mock_client
vsphere = vSphere()
status = vsphere.get_vm_status('test-vm')
self.assertIsNone(status)
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.time.sleep')
def test_wait_until_running(self, mock_sleep, mock_session, mock_create_client):
"""Test waiting for VM to reach POWERED_ON state."""
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
mock_client.vcenter.VM.list.return_value = [mock_vm_obj]
# Simulate VM transitioning to POWERED_ON after 2 checks
mock_power_states = [
MagicMock(state=Power.State.POWERED_OFF),
MagicMock(state=Power.State.POWERED_ON)
]
mock_client.vcenter.vm.Power.get.side_effect = mock_power_states
mock_create_client.return_value = mock_client
vsphere = vSphere()
mock_affected_node = MagicMock()
result = vsphere.wait_until_running('test-vm', timeout=60, affected_node=mock_affected_node)
self.assertTrue(result)
mock_affected_node.set_affected_node_status.assert_called_once()
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.time.sleep')
def test_wait_until_stopped(self, mock_sleep, mock_session, mock_create_client):
"""Test waiting for VM to reach POWERED_OFF state."""
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
mock_client.vcenter.VM.list.return_value = [mock_vm_obj]
# Simulate VM transitioning to POWERED_OFF
mock_power_states = [
MagicMock(state=Power.State.POWERED_ON),
MagicMock(state=Power.State.POWERED_OFF)
]
mock_client.vcenter.vm.Power.get.side_effect = mock_power_states
mock_create_client.return_value = mock_client
vsphere = vSphere()
mock_affected_node = MagicMock()
result = vsphere.wait_until_stopped('test-vm', timeout=60, affected_node=mock_affected_node)
self.assertTrue(result)
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.time.sleep')
def test_wait_until_running_timeout(self, mock_sleep, mock_session, mock_create_client):
"""Test wait_until_running times out."""
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
mock_client.vcenter.VM.list.return_value = [mock_vm_obj]
# VM is POWERED_OFF initially, then transitions to POWERED_ON after timeout to exit loop
call_count = [0]
def get_status_side_effect(vm):
call_count[0] += 1
# Return POWERED_OFF for first 2 calls (to exceed timeout=2 with 5 second increments)
# Then return POWERED_ON to exit the loop
if call_count[0] <= 2:
return MagicMock(state=Power.State.POWERED_OFF)
return MagicMock(state=Power.State.POWERED_ON)
mock_client.vcenter.vm.Power.get.side_effect = get_status_side_effect
mock_create_client.return_value = mock_client
vsphere = vSphere()
mock_affected_node = MagicMock()
result = vsphere.wait_until_running('test-vm', timeout=2, affected_node=mock_affected_node)
self.assertFalse(result)
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.time.sleep')
def test_wait_until_released(self, mock_sleep, mock_session, mock_create_client):
"""Test waiting for VM to be deleted."""
mock_client = MagicMock()
mock_vm_obj = MagicMock()
mock_vm_obj.vm = 'vm-123'
# VM exists first, then is deleted
mock_client.vcenter.VM.list.side_effect = [
[mock_vm_obj], # VM exists
[] # VM deleted
]
mock_create_client.return_value = mock_client
vsphere = vSphere()
mock_affected_node = MagicMock()
result = vsphere.wait_until_released('test-vm', timeout=60, affected_node=mock_affected_node)
self.assertTrue(result)
mock_affected_node.set_affected_node_status.assert_called_once()
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_get_datacenter_list(self, mock_session, mock_create_client):
"""Test getting list of datacenters."""
mock_client = MagicMock()
mock_dc1 = MagicMock()
mock_dc1.datacenter = 'dc-1'
mock_dc1.name = 'Datacenter1'
mock_dc2 = MagicMock()
mock_dc2.datacenter = 'dc-2'
mock_dc2.name = 'Datacenter2'
mock_client.vcenter.Datacenter.list.return_value = [mock_dc1, mock_dc2]
mock_create_client.return_value = mock_client
vsphere = vSphere()
datacenters = vsphere.get_datacenter_list()
self.assertEqual(len(datacenters), 2)
self.assertEqual(datacenters[0]['datacenter_name'], 'Datacenter1')
self.assertEqual(datacenters[1]['datacenter_name'], 'Datacenter2')
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_release_instances_vm_not_found(self, mock_session, mock_create_client):
"""Test release_instances raises exception when VM not found."""
mock_client = MagicMock()
mock_client.vcenter.VM.list.return_value = []
mock_create_client.return_value = mock_client
vsphere = vSphere()
with self.assertRaises(Exception) as context:
vsphere.release_instances('non-existent-vm')
self.assertIn("does not exist", str(context.exception))
@patch.dict('os.environ', {
'VSPHERE_IP': '192.168.1.100',
'VSPHERE_USERNAME': 'admin',
'VSPHERE_PASSWORD': 'password123'
})
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.create_vsphere_client')
@patch('krkn.scenario_plugins.node_actions.vmware_node_scenarios.requests.session')
def test_get_unverified_session(self, mock_session_class, mock_create_client):
"""Test creating an unverified session."""
mock_session_instance = MagicMock()
mock_session_class.return_value = mock_session_instance
mock_create_client.return_value = MagicMock()
vsphere = vSphere()
session = vsphere.get_unverified_session()
self.assertFalse(session.verify)
mock_session_class.assert_called()

View File

@@ -1,54 +0,0 @@
# ⚠️ DEPRECATED - This project has moved
> **All development has moved to [github.com/krkn-chaos/krkn-ai](https://github.com/krkn-chaos/krkn-ai)**
>
> This directory is no longer maintained. Please visit the new repository for:
> - Latest features and updates
> - Active development and support
> - Bug fixes and improvements
> - Documentation and examples
>
> See [../README.md](../README.md) for more information.
---
# aichaos
Enhancing Chaos Engineering with AI-assisted fault injection for better resiliency and non-functional testing.
## Generate python package wheel file
```
$ python3.9 generate_wheel_package.py sdist bdist_wheel
$ cp dist/aichaos-0.0.1-py3-none-any.whl docker/
```
This creates a python package file aichaos-0.0.1-py3-none-any.whl in the dist folder.
## Build Image
```
$ cd docker
$ podman build -t aichaos:1.0 .
OR
$ docker build -t aichaos:1.0 .
```
## Run Chaos AI
```
$ podman run -v aichaos-config.json:/config/aichaos-config.json --privileged=true --name aichaos -p 5001:5001 aichaos:1.0
OR
$ docker run -v aichaos-config.json:/config/aichaos-config.json --privileged -v /var/run/docker.sock:/var/run/docker.sock --name aichaos -p 5001:5001 aichaos:1.0
```
The output should look like:
```
$ podman run -v aichaos-config.json:/config/aichaos-config.json --privileged=true --name aichaos -p 5001:5001 aichaos:1.0
* Serving Flask app 'swagger_api' (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: on
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on all addresses (0.0.0.0)
* Running on http://127.0.0.1:5001
* Running on http://172.17.0.2:5001
```
You can try out the APIs in browser at http://<server-ip>:5001/apidocs (eg. http://127.0.0.1:5001/apidocs). For testing out, you can try “GenerateChaos” api with kubeconfig file and application URLs to test.

View File

@@ -1,21 +0,0 @@
FROM bitnami/kubectl:1.20.9 as kubectl
FROM python:3.9
WORKDIR /app
RUN pip3 install --upgrade pip
COPY config config/
COPY requirements.txt .
RUN mkdir -p /app/logs
RUN pip3 install -r requirements.txt
COPY --from=kubectl /opt/bitnami/kubectl/bin/kubectl /usr/local/bin/
COPY swagger_api.py .
ENV PYTHONUNBUFFERED=1
RUN curl -fsSLO https://get.docker.com/builds/Linux/x86_64/docker-17.03.1-ce.tgz && tar --strip-components=1 -xvzf docker-17.03.1-ce.tgz -C /usr/local/bin
RUN apt-get update && apt-get install -y podman
COPY aichaos-0.0.1-py3-none-any.whl .
RUN pip3 install aichaos-0.0.1-py3-none-any.whl
CMD ["python3", "swagger_api.py"]

View File

@@ -1,7 +0,0 @@
{
"command": "podman",
"chaosengine": "kraken",
"faults": "pod-delete",
"iterations": 1,
"maxfaults": 5
}

View File

@@ -1,15 +0,0 @@
Get Log from the Chaos ID.---
tags:
- ChaosAI API Results
parameters:
- name: chaosid
in: path
type: string
required: true
description: Chaos-ID
responses:
500:
description: Error!
200:
description: Results for the given Chaos ID.

View File

@@ -1,36 +0,0 @@
{
"apiVersion": "1.0",
"kind": "ChaosEngine",
"metadata": {
"name": "engine-cartns3"
},
"spec": {
"engineState": "active",
"annotationCheck": "false",
"appinfo": {
"appns": "robot-shop",
"applabel": "service=payment",
"appkind": "deployment"
},
"chaosServiceAccount": "pod-delete-sa",
"experiments": [
{
"name": "pod-delete",
"spec": {
"components": {
"env": [
{
"name": "FORCE",
"value": "true"
},
{
"name": "TOTAL_CHAOS_DURATION",
"value": "120"
}
]
}
}
}
]
}
}

View File

@@ -1,40 +0,0 @@
Generate chaos on an application deployed on a cluster.
---
tags:
- ChaosAI API
parameters:
- name: file
in: formData
type: file
required: true
description: Kube-config file
- name: namespace
in: formData
type: string
default: robot-shop
required: true
description: Namespace to test
- name: podlabels
in: formData
type: string
default: service=cart,service=payment
required: true
description: Pod labels to test
- name: nodelabels
in: formData
type: string
required: false
description: Node labels to test
- name: urls
in: formData
type: string
default: http://<application-url>:8097/api/cart/health,http://<application-url>:8097/api/payment/health
required: true
description: Application URLs to test
responses:
500:
description: Error!
200:
description: Chaos ID for the initiated chaos.

View File

@@ -1,15 +0,0 @@
Get Episodes from the Chaos ID.---
tags:
- ChaosAI API Results
parameters:
- name: chaosid
in: path
type: string
required: true
description: Chaos-ID
responses:
500:
description: Error!
200:
description: Results for the given Chaos ID.

View File

@@ -1,15 +0,0 @@
Get Log from the Chaos ID.---
tags:
- ChaosAI API Results
parameters:
- name: chaosid
in: path
type: string
required: true
description: Chaos-ID
responses:
500:
description: Error!
200:
description: Results for the given Chaos ID.

View File

@@ -1,15 +0,0 @@
Get QTable from the Chaos ID.---
tags:
- ChaosAI API Results
parameters:
- name: chaosid
in: path
type: string
required: true
description: Chaos-ID
responses:
500:
description: Error!
200:
description: Results for the given Chaos ID.

View File

@@ -1,15 +0,0 @@
Get status of the Constraints ID.---
tags:
- ChaosAI API
parameters:
- name: chaosid
in: path
type: string
required: true
description: Chaos-ID
responses:
500:
description: Error!
200:
description: Chaos for the given ID.

View File

@@ -1,6 +0,0 @@
numpy
pandas
requests
Flask==2.2.5
Werkzeug==3.1.5
flasgger==0.9.5

View File

@@ -1,186 +0,0 @@
import json, os
import logging
# import numpy as np
# import pandas as pd
import threading
from datetime import datetime
from flask import Flask, request
from flasgger import Swagger
from flasgger.utils import swag_from
# import zipfile
import sys
# sys.path.append("..")
from src.aichaos_main import AIChaos
app = Flask(__name__)
Swagger(app)
flaskdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "app", "logs") + '/'
class AIChaosSwagger:
def __init__(self, flaskdir=''):
self.flaskdir = flaskdir
@app.route("/")
def empty(params=''):
return "AI Chaos Repository!"
def startchaos(self, kubeconfigfile, file_id, params):
print('[StartChaos]', file_id, kubeconfigfile)
dir = flaskdir
outfile = ''.join([dir, 'out-', file_id])
initfile = ''.join([dir, 'init-', file_id])
with open(initfile, 'w'):
pass
if os.path.exists(outfile):
os.remove(outfile)
# kubeconfigfile = params['file']
os.environ["KUBECONFIG"] = kubeconfigfile
os.system("export KUBECONFIG="+kubeconfigfile)
os.system("echo $KUBECONFIG")
print('setting kubeconfig')
params['command'] = 'podman'
params['chaosengine'] = 'kraken'
params['faults'] = 'pod-delete'
params['iterations'] = 1
params['maxfaults'] = 5
if os.path.isfile('/config/aichaos-config.json'):
with open('/config/aichaos-config.json') as f:
config_params = json.load(f)
params['command'] = config_params['command']
params['chaosengine'] = config_params['chaosengine']
params['faults']= config_params['faults']
params['iterations'] = config_params['iterations']
params['maxfaults'] = config_params['maxfaults']
# faults = [f + ':' + p for f in params['faults'].split(',') for p in params['podlabels'].split(',')]
faults = []
for f in params['faults'].split(','):
if f in ['pod-delete']:
for p in params['podlabels'].split(','):
faults.append(f + ':' + p)
elif f in ['network-chaos', 'node-memory-hog', 'node-cpu-hog']:
for p in params['nodelabels'].split(','):
faults.append(f + ':' + p)
else:
pass
print('#faults:', len(faults), faults)
states = {'200': 0, '500': 1, '501': 2, '502': 3, '503': 4, '504': 5,
'401': 6, '403': 7, '404': 8, '429': 9,
'Timeout': 10, 'Other': 11}
rewards = {'200': -1, '500': 0.8, '501': 0.8, '502': 0.8, '503': 0.8, '504': 0.8,
'401': 1, '403': 1, '404': 1, '429': 1,
'Timeout': 1, 'Other': 1}
logfile = self.flaskdir + 'log_' + str(file_id)
qfile = self.flaskdir + 'qfile_' + str(file_id) + '.csv'
efile = self.flaskdir + 'efile_' + str(file_id)
epfile = self.flaskdir + 'episodes_' + str(file_id) + '.json'
# probe_url = params['probeurl']
cexp = {'pod-delete': 'pod-delete.json', 'cpu-hog': 'pod-cpu-hog.json',
'disk-fill': 'disk-fill.json', 'network-loss': 'network-loss.json',
'network-corruption': 'network-corruption.json', 'io-stress': 'io-stress.json'}
aichaos = AIChaos(states=states, faults=faults, rewards=rewards,
logfile=logfile, qfile=qfile, efile=efile, epfile=epfile,
urls=params['urls'].split(','), namespace=params['namespace'],
max_faults=int(params['maxfaults']),
num_requests=10, timeout=2,
chaos_engine=params['chaosengine'],
chaos_dir='config/', kubeconfig=kubeconfigfile,
loglevel=logging.DEBUG, chaos_experiment=cexp, iterations=int(params['iterations']),
command=params['command'])
print('checking kubeconfig')
os.system("echo $KUBECONFIG")
aichaos.start_chaos()
file = open(outfile, "w")
file.write('done')
file.close()
os.remove(initfile)
# os.remove(csvfile)
# ConstraintsInference().remove_temp_files(dir, file_id)
return 'WRITE'
@app.route('/GenerateChaos/', methods=['POST'])
@swag_from('config/yml/chaosGen.yml')
def chaos_gen():
dir = flaskdir
sw = AIChaosSwagger(flaskdir=dir)
f = request.files['file']
list = os.listdir(dir)
for i in range(10000):
fname = 'kubeconfig-'+str(i)
if fname not in list:
break
kubeconfigfile = ''.join([dir, 'kubeconfig-', str(i)])
f.save(kubeconfigfile)
# creating empty file
open(kubeconfigfile, 'a').close()
# print('HEADER:', f.headers)
print('[GenerateChaos] reqs:', request.form.to_dict())
# print('[GenerateChaos]', f.filename, datetime.now())
thread = threading.Thread(target=sw.startchaos, args=(kubeconfigfile, str(i), request.form.to_dict()))
thread.daemon = True
print(thread.getName())
thread.start()
return 'Chaos ID: ' + str(i)
@app.route('/GetStatus/<chaosid>', methods=['GET'])
@swag_from('config/yml/status.yml')
def get_status(chaosid):
print('[GetStatus]', chaosid, flaskdir)
epfile = flaskdir + 'episodes_' + str(chaosid) + '.json'
initfile = ''.join([flaskdir, 'init-', chaosid])
if os.path.exists(epfile):
return 'Completed'
elif os.path.exists(initfile):
return 'Running'
else:
return 'Does not exist'
@app.route('/GetQTable/<chaosid>', methods=['GET'])
@swag_from('config/yml/qtable.yml')
def get_qtable(chaosid):
print('[GetQTable]', chaosid)
qfile = flaskdir + 'qfile_' + str(chaosid) + '.csv'
initfile = ''.join([flaskdir, 'init-', chaosid])
if os.path.exists(qfile):
f = open(qfile, "r")
return f.read()
elif os.path.exists(initfile):
return 'Running'
else:
return 'Invalid Chaos ID: ' + chaosid
@app.route('/GetEpisodes/<chaosid>', methods=['GET'])
@swag_from('config/yml/episodes.yml')
def get_episodes(chaosid):
print('[GetEpisodes]', chaosid)
epfile = flaskdir + 'episodes_' + str(chaosid) + '.json'
initfile = ''.join([flaskdir, 'init-', chaosid])
if os.path.exists(epfile):
f = open(epfile, "r")
return f.read()
elif os.path.exists(initfile):
return 'Running'
else:
return 'Invalid Chaos ID: ' + chaosid
@app.route('/GetLog/<chaosid>', methods=['GET'])
@swag_from('config/yml/log.yml')
def get_log(chaosid):
print('[GetLog]', chaosid)
epfile = flaskdir + 'log_' + str(chaosid)
initfile = ''.join([flaskdir, 'init-', chaosid])
if os.path.exists(epfile):
f = open(epfile, "r")
return f.read()
elif os.path.exists(initfile):
return 'Running'
else:
return 'Invalid Chaos ID: ' + chaosid
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port='5001')

View File

@@ -1,21 +0,0 @@
import setuptools
# from setuptools_cythonize import get_cmdclass
setuptools.setup(
# cmdclass=get_cmdclass(),
name="aichaos",
version="0.0.1",
author="Sandeep Hans",
author_email="shans001@in.ibm.com",
description="Chaos AI",
long_description="Chaos Engineering using AI",
long_description_content_type="text/markdown",
url="",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires='>=3.9',
)

View File

@@ -1,11 +0,0 @@
numpy
pandas
notebook
jupyterlab
jupyter
seaborn==0.13.2
requests
wheel
Flask==2.2.5
flasgger==0.9.5
pillow==10.3.0

View File

@@ -1,213 +0,0 @@
import json
import os
import random
import sys
import numpy as np
import logging
class AIChaos:
def __init__(self, states=None, faults=None, rewards=None, pod_names=[], chaos_dir=None,
chaos_experiment='experiment.json',
chaos_journal='journal.json', iterations=1000, static_run=False):
self.faults = faults
self.pod_names = pod_names
self.states = states
self.rewards = rewards
self.episodes = []
self.chaos_dir = chaos_dir
self.chaos_experiment = chaos_experiment
self.chaos_journal = chaos_journal
self.iterations = iterations
# Initialize parameters
self.gamma = 0.75 # Discount factor
self.alpha = 0.9 # Learning rate
# Initializing Q-Values
# self.Q = np.array(np.zeros([9, 9]))
# self.Q = np.array(np.zeros([len(faults), len(faults)]))
# currently action is a single fault, later on we will do multiple faults together
# For multiple faults, the no of cols in q-matrix will be all combinations of faults (infinite)
# eg. {f1,f2},f3,f4,{f4,f5} - f1,f2 in parallel, then f3, then f4, then f4,f5 in parallel produces end state
# self.Q = np.array(np.zeros([len(states), len(states)]))
self.Q = np.array(np.zeros([len(states), len(faults)]))
self.state_matrix = np.array(np.zeros([len(states), len(states)]))
# may be Q is a dictionary of dictionaries, for each state there is a dictionary of faults
# Q = {'500' = {'f1f2f4': 0.3, 'f1': 0.5}, '404' = {'f2': 0.22}}
self.logger = logging.getLogger()
# run from old static experiment and journal files
self.static_run = static_run
# End state is reached when system is down or return error code like '500','404'
def get_next_state(self):
self.logger.info('[GET_NEXT_STATE]')
f = open(self.chaos_dir + self.chaos_journal)
data = json.load(f)
# before the experiment (if before steady state is false, after is null?)
for probe in data['steady_states']['before']['probes']:
if not probe['tolerance_met']:
# start_state = probe['activity']['tolerance']
# end_state = probe['status']
start_state, end_state = None, None
return start_state, end_state
# after the experiment
for probe in data['steady_states']['after']['probes']:
# if probe['output']['status'] == probe['activity']['tolerance']:
if not probe['tolerance_met']:
# print(probe)
start_state = probe['activity']['tolerance']
end_state = probe['output']['status']
# end_state = probe['status']
return start_state, end_state
# if tolerances for all probes are met
start_state = probe['activity']['tolerance']
end_state = probe['activity']['tolerance']
return start_state, end_state
def inject_faults(self, fault, pod_name):
self.logger.info('[INJECT_FAULT] ' + fault)
f = open(self.chaos_dir + self.chaos_experiment)
data = json.load(f)
for m in data['method']:
if 'provider' in m:
if fault == 'kill_microservice':
m['name'] = 'kill-microservice'
m['provider']['module'] = 'chaosk8s.actions'
m['provider']['arguments']['name'] = pod_name
else:
m['provider']['arguments']['name_pattern'] = pod_name
m['provider']['func'] = fault
print('[INJECT_FAULT] method:', m)
# self.logger.info('[INJECT_FAULT] ' + m['provider']['arguments']['name_pattern'])
# self.logger.info('[INJECT_FAULT] ' + str(m))
exp_file = self.chaos_dir + 'experiment_' + str(random.randint(1, 10)) + '.json'
with open(exp_file, 'w') as f:
json.dump(data, f)
exp_file = self.chaos_dir + 'experiment.json'
# execute faults
# cmd = 'cd ' + self.chaos_dir + ';chaos run ' + self.chaos_experiment
cmd = 'cd ' + self.chaos_dir + ';chaos run ' + exp_file
if not self.static_run:
os.system(cmd)
def create_episode(self):
self.logger.info('[CREATE_EPISODE]')
episode = []
while True:
# inject more faults
# TODO: model - choose faults based on q-learning ...
fault_pod = random.choice(self.faults)
fault = fault_pod.split(':')[0]
pod_name = fault_pod.split(':')[1]
# fault = random.choice(self.faults)
# pod_name = random.choice(self.pod_names)
# fault = lstm_model.get_next_fault(episode)
# fault = get_max_prob_fault(episode)
self.inject_faults(fault, pod_name)
start_state, next_state = self.get_next_state()
print('[CREATE EPISODE]', start_state, next_state)
# if before state tolerance is not met
if start_state is None and next_state is None:
continue
episode.append({'fault': fault, 'pod_name': pod_name})
self.update_q_fault(fault_pod, episode, start_state, next_state)
# self.update_q_fault(fault, episode, start_state, next_state)
# if an end_state is reached
# if next_state is not None:
if start_state != next_state:
self.logger.info('[CREATE_EPISODE] EPISODE CREATED:' + str(episode))
self.logger.info('[CREATE_EPISODE] END STATE:' + str(next_state))
return episode, start_state, next_state
def update_q_fault(self, fault, episode, start_state, end_state):
self.logger.info('[UPDATE_Q]')
print('[UPDATE_Q] ', str(start_state), str(end_state))
if end_state is None:
end_state = start_state
# reward is dependent on the error response (eg. '404') and length of episode
reward = self.rewards[str(end_state)] / len(episode)
current_state = self.states[str(start_state)]
next_state = self.states[str(end_state)]
fault_index = self.faults.index(fault)
TD = reward + \
self.gamma * self.Q[next_state, np.argmax(self.Q[next_state,])] - \
self.Q[current_state, fault_index]
self.Q[current_state, fault_index] += self.alpha * TD
# update state matrix
TD_state = reward + \
self.gamma * self.state_matrix[next_state, np.argmax(self.state_matrix[next_state,])] - \
self.state_matrix[current_state, next_state]
self.state_matrix[current_state, next_state] += self.alpha * TD_state
# def update_q(self, episode, start_state, end_state):
# self.logger.info('[UPDATE_Q]')
# if end_state is None:
# end_state = start_state
#
# # reward is dependent on the error response (eg. '404') and length of episode
# reward = self.rewards[str(end_state)] / len(episode)
# current_state = self.states[str(start_state)]
# next_state = self.states[str(end_state)]
# TD = reward + \
# self.gamma * self.Q[next_state, np.argmax(self.Q[next_state,])] - \
# self.Q[current_state, next_state]
# self.Q[current_state, next_state] += self.alpha * TD
def start_chaos(self):
for i in range(self.iterations):
episode, start_state, end_state = self.create_episode()
# update Q matrix
# will do it with each fault injection
# self.update_q(episode, start_state, end_state)
print(self.Q)
print(self.state_matrix)
def test_chaos():
svc_list = ['cart', 'catalogue', 'dispatch', 'mongodb', 'mysql', 'payment', 'rabbitmq', 'ratings', 'redis',
'shipping', 'user', 'web']
# Define faults
# faults = ['terminate_pods']
# faults = ['terminate_pods:' + x for x in pod_names]
faults = ['kill_microservice:' + x for x in svc_list]
# Define the states
states = {
'200': 0,
'500': 1,
'404': 2
}
# Define rewards, currently not used
rewards = {
'200': 0,
'500': 0.8,
'404': 1
}
# cdir = '/Users/sandeephans/Downloads/chaos/chaostoolkit-samples-master/service-down-not-visible-to-users/'
cdir = '/Users/sandeephans/Downloads/openshift/'
cexp = 'experiment.json'
cjournal = 'journal.json'
aichaos = AIChaos(states=states, faults=faults, rewards=rewards,
chaos_dir=cdir, chaos_experiment=cexp, chaos_journal=cjournal,
static_run=False)
aichaos.start_chaos()
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
test_chaos()

View File

@@ -1,248 +0,0 @@
import json
import os
import random
import numpy as np
import pandas as pd
import logging
# sys.path.insert(1, os.path.join(sys.path[0], '..'))
import src.utils as utils
from src.kraken_utils import KrakenUtils
from src.qlearning import QLearning
from src.test_application import TestApplication
class AIChaos:
def __init__(self, namespace='robot-shop', states=None, faults=None, rewards=None, urls=[], max_faults=5,
service_weights=None, ctd_subsets=None, pod_names=[], chaos_dir='../config/', kubeconfig='~/.kube/config',
chaos_experiment='experiment.json', logfile='log', qfile='qfile.csv', efile='efile', epfile='episodes.json',
loglevel=logging.INFO,
chaos_journal='journal.json', iterations=10, alpha=0.9, gamma=0.2, epsilon=0.3,
num_requests=10, sleep_time=1, timeout=2, chaos_engine='kraken', dstk_probes=None,
static_run=False, all_faults=False, command='podman'):
self.namespace = namespace
self.faults = faults
self.unused_faults = faults.copy()
self.all_faults = all_faults
self.pod_names = pod_names
self.states = states
self.rewards = rewards
self.urls = urls
self.max_faults = max_faults
self.episodes = []
self.service_weights = service_weights
self.ctd_subsets = ctd_subsets
self.kubeconfig = kubeconfig
self.chaos_dir = chaos_dir
self.chaos_experiment = chaos_experiment
self.chaos_journal = chaos_journal
self.command = command
if chaos_engine == 'kraken':
self.chaos_engine = KrakenUtils(namespace, kubeconfig=kubeconfig, chaos_dir=chaos_dir, chaos_experiment=chaos_experiment, command=self.command)
else:
self.chaos_engine = None
self.iterations = iterations
# Initialize RL parameters
self.epsilon = epsilon # epsilon decay policy
# self.epsdecay = 0
# log files
self.logfile = logfile
self.qfile = qfile
self.efile = efile
self.epfile = epfile
open(efile, 'w+').close()
open(logfile, 'w+').close()
open(logfile, 'r+').truncate(0)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.basicConfig(filename=logfile, filemode='w+', level=loglevel)
self.logger = logging.getLogger(logfile.replace('/',''))
self.logger.addHandler(logging.FileHandler(logfile))
self.testapp = TestApplication(num_requests, timeout, sleep_time)
self.ql = QLearning(gamma, alpha, faults, states, rewards, urls)
# run from old static experiment and journal files
self.static_run = static_run
def realistic(self, faults_pods):
self.logger.debug('[Realistic] ' + str(faults_pods))
fp = faults_pods.copy()
for f1 in faults_pods:
for f2 in faults_pods:
if f1 == f2:
continue
if f1 in fp and f2 in fp:
f1_fault, load_1 = utils.get_load(f1.split(':')[0])
f1_pod = f1.split(':')[1]
f2_fault, load_2 = utils.get_load(f2.split(':')[0])
f2_pod = f2.split(':')[1]
if f1_pod == f2_pod:
if f1_fault == 'pod-delete':
fp.remove(f2)
if f1_fault == f2_fault:
# if int(load_1) > int(load_2):
# randomly remove one fault from same faults with different params
fp.remove(f2)
if self.service_weights is None:
return fp
fp_copy = fp.copy()
for f in fp:
f_fault = f.split(':')[0]
f_pod = f.split(':')[1].replace('service=', '')
self.logger.debug('[ServiceWeights] ' + f + ' ' + str(self.service_weights[f_pod][f_fault]))
if self.service_weights[f_pod][f_fault] == 0:
fp_copy.remove(f)
self.logger.debug('[Realistic] ' + str(fp_copy))
return fp_copy
def select_faults(self):
max_faults = min(self.max_faults, len(self.unused_faults))
num_faults = random.randint(1, max_faults)
if self.all_faults:
num_faults = len(self.unused_faults)
if random.random() > self.epsilon:
self.logger.info('[Exploration]')
# faults_pods = random.sample(self.faults, k=num_faults)
# using used faults list to avoid starvation
faults_pods = random.sample(self.unused_faults, k=num_faults)
faults_pods = self.realistic(faults_pods)
for f in faults_pods:
self.unused_faults.remove(f)
if len(self.unused_faults) == 0:
self.unused_faults = self.faults.copy()
else:
self.logger.info('[Exploitation]')
first_row = self.ql.Q[:, 0, :][0]
top_k_indices = np.argpartition(first_row, -num_faults)[-num_faults:]
faults_pods = [self.faults[i] for i in top_k_indices]
faults_pods = self.realistic(faults_pods)
return faults_pods
def create_episode(self, ctd_subset=None):
self.logger.debug('[CREATE_EPISODE]')
episode = []
if ctd_subset is None:
faults_pods = self.select_faults()
else:
faults_pods = ctd_subset
self.logger.info('CTD Subset: ' + str(faults_pods))
# faults_pods = self.realistic(faults_pods)
if len(faults_pods) == 0:
return [], 200, 200
engines = []
for fp in faults_pods:
fault = fp.split(':')[0]
pod_name = fp.split(':')[1]
engine = self.chaos_engine.inject_faults(fault, pod_name)
engines.append(engine)
episode.append({'fault': fault, 'pod_name': pod_name})
self.logger.info('[create_episode]' + str(faults_pods))
engines_running = self.chaos_engine.wait_engines(engines)
self.logger.info('[create_episode] engines_running' + str(engines_running))
if not engines_running:
return None, None, None
# randomly shuffling urls
urls = random.sample(self.urls, len(self.urls))
ep_json = []
for url in urls:
start_state, next_state = self.testapp.test_load(url)
self.logger.info('[CREATE EPISODE]' + str(start_state) + ',' + str(next_state))
# if before state tolerance is not met
if start_state is None and next_state is None:
# self.cleanup()
self.chaos_engine.stop_engines()
continue
### episode.append({'fault': fault, 'pod_name': pod_name})
# self.update_q_fault(fault_pod, episode, start_state, next_state)
url_index = self.urls.index(url)
self.logger.info('[CREATEEPISODE]' + str(url) + ':' + str(url_index))
for fp in faults_pods:
self.ql.update_q_fault(fp, episode, start_state, next_state, self.urls.index(url))
ep_json.append({'start_state': start_state, 'next_state': next_state, 'url': url, 'faults': episode})
self.logger.debug('[CREATE_EPISODE] EPISODE CREATED:' + str(episode))
self.logger.debug('[CREATE_EPISODE] END STATE:' + str(next_state))
self.chaos_engine.print_result(engines)
self.chaos_engine.stop_engines(episode=episode)
# ep_json = {'start_state': start_state, 'next_state': next_state, 'faults': episode}
return ep_json, start_state, next_state
def start_chaos(self):
self.logger.info('[INITIALIZING]')
self.logger.info('Logfile: '+self.logfile)
self.logger.info('Loggerfile: '+self.logger.handlers[0].stream.name)
self.logger.info('Chaos Engine: ' + self.chaos_engine.get_name())
self.logger.debug('Faults:' + str(self.faults))
self.chaos_engine.cleanup()
if self.ctd_subsets is None:
for i in range(self.iterations):
episode, start_state, end_state = self.create_episode()
self.logger.debug('[start_chaos]' + str(i) + ' ' + str(episode))
if episode is None:
continue
# update Q matrix
# will do it with each fault injection
# self.update_q(episode, start_state, end_state)
# if episode['next_state'] != '200':
self.episodes.extend(episode)
self.logger.info(str(i) + ' ' + str(self.ql.Q[:, 0]))
# print(i, self.state_matrix)
self.write_q()
self.write_episode(episode)
else:
for i, subset in enumerate(self.ctd_subsets):
episode, start_state, end_state = self.create_episode(subset)
self.logger.debug('[start_chaos]' + str(episode))
if episode is None:
continue
self.episodes.append(episode)
self.logger.info(str(i) + ' ' + str(self.ql.Q[:, 0]))
self.write_q()
self.write_episode(episode)
self.chaos_engine.cleanup()
# self.remove_temp_file()
with open(self.epfile, 'w', encoding='utf-8') as f:
json.dump(self.episodes, f, ensure_ascii=False, indent=4)
self.logger.info('COMPLETE!!!')
def write_q(self):
df = pd.DataFrame(self.ql.Q[:, 0, :], index=self.urls, columns=self.faults)
df.to_csv(self.qfile)
return df
def write_episode(self, episode):
for ep in episode:
with open(self.efile, "a") as outfile:
x = [e['fault'] + ':' + e['pod_name'] for e in ep['faults']]
x.append(ep['url'])
x.append(str(ep['next_state']))
outfile.write(','.join(x) + '\n')
def remove_temp_file(self):
mydir = self.chaos_dir + 'experiments'
print('Removing temp files from: '+mydir)
self.logger.debug('Removing temp files: '+mydir)
if os.path.exists(mydir):
return
filelist = [f for f in os.listdir(mydir) if f.endswith(".json")]
for f in filelist:
print(f)
os.remove(os.path.join(mydir, f))

View File

@@ -1,56 +0,0 @@
import random
class Experiments:
def __init__(self):
self.k = 0
def monotonic(self, aichaos, num_sets=3):
for i in range(num_sets):
faults_pods = random.sample(aichaos.faults, k=2)
faults_set = [[faults_pods[0]], [faults_pods[1]], [faults_pods[0], faults_pods[1]]]
resp1, resp2, resp_both = 0, 0, 0
for fl in faults_set:
engines = []
for fp in fl:
fault = fp.split(':')[0]
pod_name = fp.split(':')[1]
engine = aichaos.inject_faults_litmus(fault, pod_name)
engines.append(engine)
aichaos.litmus.wait_engines(engines)
for index, url in enumerate(aichaos.urls):
start_state, next_state = aichaos.test_load(url)
print(i, fl, next_state)
# self.write(str(fl), next_state)
if resp1 == 0:
resp1 = next_state
elif resp2 == 0:
resp2 = next_state
else:
resp_both = next_state
aichaos.litmus.stop_engines()
self.write_resp(str(faults_set[2]), resp1, resp2, resp_both)
print('Experiment Complete!!!')
@staticmethod
def write(fault, next_state):
with open("experiment", "a") as outfile:
outfile.write(fault + ',' + str(next_state) + ',' + '\n')
@staticmethod
def write_resp(faults, resp1, resp2, resp3):
monotonic = True
if resp3 == 200:
if resp1 != 200 or resp2 != 200:
monotonic = False
else:
if resp1 == 200 and resp2 == 200:
monotonic = False
with open("experiment", "a") as outfile:
# outfile.write(faults + ',' + str(resp1) + ',' + '\n')
outfile.write(faults + ',' + str(resp1) + ',' + str(resp2) + ',' + str(resp3) + ',' + str(monotonic) + '\n')

View File

@@ -1,99 +0,0 @@
import json
import os
import time
import logging
import src.utils as utils
class KrakenUtils:
def __init__(self, namespace='robot-shop', chaos_dir='../config/',
chaos_experiment='experiment.json', kubeconfig='~/.kube/config', wait_checks=60, command='podman'):
self.chaos_dir = chaos_dir
self.chaos_experiment = chaos_experiment
self.namespace = namespace
self.kubeconfig = kubeconfig
self.logger = logging.getLogger()
self.engines = []
self.wait_checks = wait_checks
self.command = command
def exp_status(self, engine='engine-cartns3'):
substring_list = ['Waiting for the specified duration','Waiting for wait_duration', 'Step workload started, waiting for response']
substr = '|'.join(substring_list)
# cmd = "docker logs "+engine+" 2>&1 | grep Waiting"
# cmd = "docker logs "+engine+" 2>&1 | grep -E '"+substr+"'"
cmd = self.command +" logs "+engine+" 2>&1 | grep -E '"+substr+"'"
line = os.popen(cmd).read()
self.logger.debug('[exp_status]'+line)
# if 'Waiting for the specified duration' in line:
# if 'Waiting for' in line or 'waiting for' in line:
# if 'Waiting for the specified duration' in line or 'Waiting for wait_duration' in line or 'Step workload started, waiting for response' in line:
if any(map(line.__contains__, substring_list)):
return 'Running'
return 'Not Running'
# print chaos result, check if litmus showed any error
def print_result(self, engines):
# self.logger.debug('')
for e in engines:
# cmd = 'kubectl describe chaosresult ' + e + ' -n ' + self.namespace + ' | grep "Fail Step:"'
# line = os.popen(cmd).read()
# self.logger.debug('[Chaos Result] '+e+' : '+line)
self.logger.debug('[KRAKEN][Chaos Result] '+e)
def wait_engines(self, engines=[]):
status = 'Completed'
max_checks = self.wait_checks
for e in engines:
self.logger.info('[Wait Engines] ' + e)
for i in range(max_checks):
status = self.exp_status(e)
if status == 'Running':
break
time.sleep(1)
# return False, if even one engine is not running
if status != 'Running':
return False
self.engines = engines
# return True if all engines are running
return True
def cleanup(self):
self.logger.debug('Removing previous engines')
# cmd = "docker rm $(docker ps -q -f 'status=exited')"
if len(self.engines) > 0:
cmd = self.command+" stop " + " ".join(self.engines) + " >> temp"
os.system(cmd)
self.engines = []
cmd = self.command+" container prune -f >> temp"
os.system(cmd)
self.logger.debug('Engines removed')
def stop_engines(self, episode=[]):
self.cleanup()
def get_name(self):
return 'kraken'
def inject_faults(self, fault, pod_name):
self.logger.debug('[KRAKEN][INJECT_FAULT] ' + fault + ':' + pod_name)
fault, load = utils.get_load(fault)
engine = 'engine-' + pod_name.replace('=', '-').replace('/','-') + '-' + fault
if fault == 'pod-delete':
cmd = self.command+' run -d -e NAMESPACE='+self.namespace+' -e POD_LABEL='+pod_name+' --name='+engine+' --net=host -v '+self.kubeconfig+':/root/.kube/config:Z quay.io/redhat-chaos/krkn-hub:pod-scenarios >> temp'
elif fault == 'network-chaos':
# 'docker run -e NODE_NAME=minikube-m03 -e DURATION=10 --name=knetwork --net=host -v /home/chaos/.kube/kube-config-raw:/root/.kube/config:Z -d quay.io/redhat-chaos/krkn-hub:network-chaos >> temp'
cmd = self.command+' run -d -e NODE_NAME='+pod_name+' -e DURATION=120 --name='+engine+' --net=host -v '+self.kubeconfig+':/root/.kube/config:Z -d quay.io/redhat-chaos/krkn-hub:network-chaos >> temp'
elif fault == 'node-memory-hog':
cmd = self.command+' run -d -e NODE_NAME='+pod_name+' -e DURATION=120 -e NODES_AFFECTED_PERC=100 --name='+engine+' --net=host -v '+self.kubeconfig+':/root/.kube/config:Z -d quay.io/redhat-chaos/krkn-hub:node-memory-hog >> temp'
elif fault == 'node-cpu-hog':
cmd = self.command+' run -e NODE_SELECTORS='+pod_name+' -e NODE_CPU_PERCENTAGE=100 -e NAMESPACE='+self.namespace+' -e TOTAL_CHAOS_DURATION=120 -e NODE_CPU_CORE=100 --name='+engine+' --net=host -env-host=true -v '+self.kubeconfig+':/root/.kube/config:Z -d quay.io/redhat-chaos/krkn-hub:node-cpu-hog'
else:
cmd = 'echo'
self.logger.debug('[KRAKEN][INJECT_FAULT] ' + cmd)
os.system(cmd)
return engine

View File

@@ -1,62 +0,0 @@
import logging
import numpy as np
class QLearning:
def __init__(self, gamma=None, alpha=None, faults=None, states=None, rewards=None, urls=None):
self.gamma = gamma # Discount factor
self.alpha = alpha # Learning rate
self.faults = faults
self.states = states
self.rewards = rewards
# Initializing Q-Values
# self.Q = np.array(np.zeros([len(states), len(states)]))
self.Q = np.array(np.zeros([len(urls), len(states), len(faults)]))
self.state_matrix = np.array(np.zeros([len(states), len(states)]))
self.logger = logging.getLogger()
def update_q_fault(self, fault, episode, start_state, end_state, url_index):
self.logger.info('[UPDATE_Q] ' + str(url_index) + ' ' + fault + ' ' + str(start_state) + '->' + str(end_state))
if end_state is None:
end_state = start_state
if end_state not in self.states:
end_state = 'Other'
# reward is dependent on the error response (eg. '404') and length of episode
reward = self.rewards[str(end_state)] / len(episode)
current_state = self.states[str(start_state)]
next_state = self.states[str(end_state)]
fault_index = self.faults.index(fault)
# self.logger.debug('[update_q]' + fault + ' ' + str(fault_index) + ' ' + str(reward))
# self.logger.debug('reward, gamma: ' + str(reward) + ' ' + str(self.gamma))
# self.logger.debug(
# 'gamma*val' + str(self.gamma * self.Q[url_index, next_state, np.argmax(self.Q[url_index, next_state,])]))
# self.logger.debug('current state val:' + str(self.Q[url_index, current_state, fault_index]))
TD = reward + \
self.gamma * self.Q[url_index, next_state, np.argmax(self.Q[url_index, next_state,])] - \
self.Q[url_index, current_state, fault_index]
self.Q[url_index, current_state, fault_index] += self.alpha * TD
# update state matrix
TD_state = reward + \
self.gamma * self.state_matrix[next_state, np.argmax(self.state_matrix[next_state,])] - \
self.state_matrix[current_state, next_state]
self.state_matrix[current_state, next_state] += self.alpha * TD_state
# self.logger.debug('updated Q' + str(self.Q[url_index, current_state, fault_index]))
# def update_q(self, episode, start_state, end_state):
# self.logger.info('[UPDATE_Q]')
# if end_state is None:
# end_state = start_state
#
# # reward is dependent on the error response (eg. '404') and length of episode
# reward = self.rewards[str(end_state)] / len(episode)
# current_state = self.states[str(start_state)]
# next_state = self.states[str(end_state)]
# TD = reward + \
# self.gamma * self.Q[next_state, np.argmax(self.Q[next_state,])] - \
# self.Q[current_state, next_state]
# self.Q[current_state, next_state] += self.alpha * TD

View File

@@ -1,171 +0,0 @@
import json, os
import logging
# import numpy as np
# import pandas as pd
import threading
from datetime import datetime
from flask import Flask, request
from flasgger import Swagger
from flasgger.utils import swag_from
# import zipfile
import sys
sys.path.append("..")
from aichaos_main import AIChaos
app = Flask(__name__)
Swagger(app)
flaskdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config", "experiments",
"flask") + '/'
class AIChaosSwagger:
def __init__(self, flaskdir=''):
self.flaskdir = flaskdir
@app.route("/")
def empty(params=''):
return "AI Chaos Repository!"
def startchaos(self, kubeconfigfile, file_id, params):
print('[StartChaos]', file_id, kubeconfigfile)
dir = flaskdir
outfile = ''.join([dir, 'out-', file_id])
initfile = ''.join([dir, 'init-', file_id])
with open(initfile, 'w'):
pass
if os.path.exists(outfile):
os.remove(outfile)
# cons = ConstraintsInference(outdir=dir).get_constraints(csvfile, file_id, params, verbose=False,
# write_local=False)
os.environ["KUBECONFIG"] = kubeconfigfile
params['command'] = 'podman'
params['chaos_engine'] = 'kraken'
params['faults'] = 'pod-delete'
params['iterations'] = 1
params['maxfaults'] = 5
if os.path.isfile('/config/aichaos-config.json'):
with open('/config/aichaos-config.json') as f:
config_params = json.load(f)
params['command'] = config_params['command']
params['chaos_engine'] = config_params['chaos_engine']
params['faults']= config_params['faults']
params['iterations'] = config_params['iterations']
params['maxfaults'] = config_params['maxfaults']
faults = [f + ':' + p for f in params['faults'].split(',') for p in params['podlabels'].split(',')]
print('#faults:', len(faults), faults)
states = {'200': 0, '500': 1, '502': 2, '503': 3, '404': 4, 'Timeout': 5}
rewards = {'200': -1, '500': 0.8, '502': 0.8, '503': 0.8, '404': 1, 'Timeout': 1}
logfile = self.flaskdir + 'log_' + str(file_id)
qfile = self.flaskdir + 'qfile_' + str(file_id) + '.csv'
efile = self.flaskdir + 'efile_' + str(file_id)
epfile = self.flaskdir + 'episodes_' + str(file_id) + '.json'
probe_url = params['probeurl']
probes = {'pod-delete': 'executeprobe', 'cpu-hog': 'wolffi/cpu_load', 'disk-fill': 'wolffi/memory_load',
'io_load': 'wolffi/io_load', 'http_delay': 'wolffi/http_delay', 'packet_delay': 'wolffi/packet_delay',
'packet_duplication': 'wolffi/packet_duplication', 'packet_loss': 'wolffi/packet_loss',
'packet_corruption': 'wolffi/packet_corruption',
'packet_reordering': 'wolffi/packet_reordering', 'network_load': 'wolffi/network_load',
'http_bad_request': 'wolffi/http_bad_request',
'http_unauthorized': 'wolffi/http_unauthorized', 'http_forbidden': 'wolffi/http_forbidden',
'http_not_found': 'wolffi/http_not_found',
'http_method_not_allowed': 'wolffi/http_method_not_allowed',
'http_not_acceptable': 'wolffi/http_not_acceptable',
'http_request_timeout': 'wolffi/http_request_timeout',
'http_unprocessable_entity': 'wolffi/http_unprocessable_entity',
'http_internal_server_error': 'wolffi/http_internal_server_error',
'http_not_implemented': 'wolffi/http_not_implemented',
'http_bad_gateway': 'wolffi/http_bad_gateway',
'http_service_unavailable': 'wolffi/http_service_unavailable',
'bandwidth_restrict': 'wolffi/bandwidth_restrict',
'pod_cpu_load': 'wolffi/pod_cpu_load', 'pod_memory_load': 'wolffi/pod_memory_load',
'pod_io_load': 'wolffi/pod_io_load',
'pod_network_load': 'wolffi/pod_network_load'
}
dstk_probes = {k: probe_url + v for k, v in probes.items()}
cexp = {'pod-delete': 'pod-delete.json', 'cpu-hog': 'pod-cpu-hog.json',
'disk-fill': 'disk-fill.json', 'network-loss': 'network-loss.json',
'network-corruption': 'network-corruption.json', 'io-stress': 'io-stress.json'}
aichaos = AIChaos(states=states, faults=faults, rewards=rewards,
logfile=logfile, qfile=qfile, efile=efile, epfile=epfile,
urls=params['urls'].split(','), namespace=params['namespace'],
max_faults=params['maxfaults'],
num_requests=10, timeout=2,
chaos_engine=params['chaos_engine'], dstk_probes=dstk_probes, command=params['command'],
loglevel=logging.DEBUG, chaos_experiment=cexp, iterations=params['iterations'])
aichaos.start_chaos()
file = open(outfile, "w")
file.write('done')
file.close()
os.remove(initfile)
# os.remove(csvfile)
# ConstraintsInference().remove_temp_files(dir, file_id)
return 'WRITE'
@app.route('/GenerateChaos/', methods=['POST'])
@swag_from('../config/yml/chaosGen.yml')
def chaos_gen():
dir = flaskdir
sw = AIChaosSwagger(flaskdir=dir)
f = request.files['file']
list = os.listdir(dir)
for i in range(10000):
if str(i) not in list:
break
kubeconfigfile = ''.join([dir, str(i)])
f.save(kubeconfigfile)
print('HEADER:', f.headers)
print('[GenerateChaos] reqs:', request.form.to_dict())
print('[GenerateChaos]', f.filename, datetime.now())
# thread = threading.Thread(target=sw.write_constraints, args=(csvfile, str(i), parameters))
thread = threading.Thread(target=sw.startchaos, args=(kubeconfigfile, str(i), request.form.to_dict()))
thread.daemon = True
print(thread.getName())
thread.start()
return 'Chaos ID: ' + str(i)
@app.route('/GetStatus/<chaosid>', methods=['GET'])
@swag_from('../config/yml/status.yml')
def get_status(chaosid):
print('[GetStatus]', chaosid, flaskdir)
epfile = flaskdir + 'episodes_' + str(chaosid) + '.json'
initfile = ''.join([flaskdir, 'init-', chaosid])
if os.path.exists(epfile):
return 'Completed'
elif os.path.exists(initfile):
return 'Running'
else:
return 'Does not exist'
@app.route('/GetQTable/<chaosid>', methods=['GET'])
@swag_from('../config/yml/qtable.yml')
def get_qtable(chaosid):
print('[GetQTable]', chaosid)
qfile = flaskdir + 'qfile_' + str(chaosid) + '.csv'
initfile = ''.join([flaskdir, 'init-', chaosid])
if os.path.exists(qfile):
f = open(qfile, "r")
return f.read()
elif os.path.exists(initfile):
return 'Running'
else:
return 'Invalid Chaos ID: ' + chaosid
@app.route('/GetEpisodes/<chaosid>', methods=['GET'])
@swag_from('../config/yml/episodes.yml')
def get_episodes(chaosid):
print('[GetEpisodes]', chaosid)
epfile = flaskdir + 'episodes_' + str(chaosid) + '.json'
initfile = ''.join([flaskdir, 'init-', chaosid])
if os.path.exists(epfile):
f = open(epfile, "r")
return f.read()
elif os.path.exists(initfile):
return 'Running'
else:
return 'Invalid Chaos ID: ' + chaosid
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port='5001')

View File

@@ -1,83 +0,0 @@
import json
import logging
import time
import requests
class TestApplication:
def __init__(self, num_requests=10, timeout=2, sleep_time=1):
self.num_requests = num_requests
self.timeout = timeout
self.sleep_time = sleep_time
self.logger = logging.getLogger()
def test_load(self, url=''):
# url = 'http://192.168.49.2:31902/api/cart/health'
timeout_count = 0
avg_lat = 0
for i in range(self.num_requests):
try:
r = requests.get(url, verify=False, timeout=self.timeout)
avg_lat += r.elapsed.total_seconds()
self.logger.info(
url + ' ' + str(i) + ':' + str(r.status_code) + " {:.2f}".format(r.elapsed.total_seconds())
+ " {:.2f}".format(avg_lat))
if r.status_code != 200:
return '200', r.status_code
# except requests.exceptions.Timeout as toe:
except Exception as toe:
self.logger.info(url + ' ' + str(i) + ':' + 'Timeout Exception!')
timeout_count += 1
if timeout_count > 3:
return '200', 'Timeout'
# except Exception as e:
# self.logger.debug('Connection refused!'+str(e))
time.sleep(self.sleep_time)
self.logger.info(url + "Avg: {:.2f}".format(avg_lat/self.num_requests))
return '200', '200'
# def test_load_hey(self):
# cmd = 'hey -c 2 -z 20s http://192.168.49.2:31902/api/cart/health > temp'
# os.system(cmd)
# with open('temp') as f:
# datafile = f.readlines()
# found = False
# for line in datafile:
# if 'Status code distribution:' in line:
# found = True
# if found:
# print('[test_load]', line)
# m = re.search(r"\[([A-Za-z0-9_]+)\]", line)
# if m is not None:
# resp_code = m.group(1)
# if resp_code != 200:
# return '200', resp_code
# return '200', '200'
# # End state is reached when system is down or return error code like '500','404'
# def get_next_state(self):
# self.logger.info('[GET_NEXT_STATE]')
# f = open(self.chaos_dir + self.chaos_journal)
# data = json.load(f)
#
# # before the experiment (if before steady state is false, after is null?)
# for probe in data['steady_states']['before']['probes']:
# if not probe['tolerance_met']:
# # start_state = probe['activity']['tolerance']
# # end_state = probe['status']
# start_state, end_state = None, None
# return start_state, end_state
#
# # after the experiment
# for probe in data['steady_states']['after']['probes']:
# # if probe['output']['status'] == probe['activity']['tolerance']:
# if not probe['tolerance_met']:
# # print(probe)
# start_state = probe['activity']['tolerance']
# end_state = probe['output']['status']
# # end_state = probe['status']
# return start_state, end_state
# # if tolerances for all probes are met
# start_state = probe['activity']['tolerance']
# end_state = probe['activity']['tolerance']
# return start_state, end_state

View File

@@ -1,10 +0,0 @@
import re
def get_load(fault):
params = re.findall(r'\(.*?\)', fault)
load = 100
if len(params) > 0:
load = params[0].strip('()')
fault = fault.strip(params[0])
return fault, load

View File

@@ -46,7 +46,7 @@ To run the recommender with a config file specify the config file path with the
You can customize the default values by editing the `recommender_config.yaml` file. The configuration file contains the following options:
- `application`: Specify the application name.
- `namespaces`: Specify the namespaces names (separated by coma or space). If you want to profile
- `namespaces`: Specify the namespaces names (separated by comma or space). If you want to profile
- `labels`: Specify the labels (not used).
- `kubeconfig`: Specify the location of the kubeconfig file (not used).
- `prometheus_endpoint`: Specify the prometheus endpoint (must).