Node egress traffic shaping

Patch adds a scenario to create variations in egress traffic of a Node's interface using the tc and Netem.
This commit is contained in:
yogananth-subramanian
2021-12-01 02:49:24 +05:30
committed by Naga Ravi Chaitanya Elluri
parent 1776a9850f
commit 50dd9873c1
14 changed files with 394 additions and 0 deletions

View File

@@ -0,0 +1,6 @@
network_chaos: # Scenario to create an outage by simulating random variations in the network.
duration: 10 # seconds
instance_count: 1
execution: serial
egress:
bandwidth: 100mbit

View File

@@ -8,3 +8,4 @@ test_io_hog
test_mem_hog
test_cpu_hog
test_shut_down
test_net_chaos

19
CI/tests/test_net_chaos.sh Executable file
View File

@@ -0,0 +1,19 @@
set -xeEo pipefail
source CI/tests/common.sh
trap error ERR
trap finish EXIT
function functional_test_network_chaos {
export scenario_type="network_chaos"
export scenario_file="CI/scenarios/network_chaos.yaml"
export post_config=""
envsubst < CI/config/common_test_config.yaml > CI/config/network_chaos.yaml
python3 run_kraken.py -c CI/config/network_chaos.yaml
echo "Network Chaos test: Success"
}
functional_test_network_chaos

View File

@@ -63,6 +63,8 @@ Instructions on how to setup the config and the options supported can be found a
- [PVC scenario](docs/pvc_scenario.md)
- [Network_Chaos](docs/network_chaos.md)
### Kraken scenario pass/fail criteria and report
It's important to make sure to check if the targeted component recovered from the chaos injection and also if the Kubernetes/OpenShift cluster is healthy as failures in one component can have an adverse impact on other components. Kraken does this by:

View File

@@ -41,6 +41,8 @@ kraken:
- scenarios/app_outage.yaml
- pvc_scenarios:
- scenarios/pvc_scenario.yaml
- network_chaos:
- scenarios/network_chaos.yaml
cerberus:
cerberus_enabled: False # Enable it when cerberus is previously installed

View File

@@ -34,6 +34,8 @@ kraken:
- scenarios/app_outage.yaml
- pvc_scenarios:
- scenarios/pvc_scenario.yaml
- network_chaos:
- scenarios/network_chaos.yaml
cerberus:
cerberus_enabled: True # Enable it when cerberus is previously installed
cerberus_url: http://0.0.0.0:8080 # When cerberus_enabled is set to True, provide the url where cerberus publishes go/no-go signal

26
docs/network_chaos.md Normal file
View File

@@ -0,0 +1,26 @@
### Network chaos
Scenario to introduce network latency, packet loss, bandwidth restriction in the Node's hostnework interface. The purpose of this scenario is to observe faults caused by random variations in the network.
##### Sample scenario config
```
network_chaos: # Scenario to create an outage by simulating random variations in the network.
duration: 300 # in seconds - during with network chaos will be applied.
node_name: # comma separated node names on which scenario has to be injected.
label_selector: node-role.kubernetes.io/master # when node_name is not specified, a node with matching label_selector is selected for running the scenario.
instance_count: 1 # Number of nodes to execute network chaos in.
interfaces: # List of interface on which to apply the network restriction.
- "ens5" # Interface name would be the Kernel host network interface name.
execution: serial|parallel # Execute each of the egress option as a single scenario(parallel) or as separate scenario(serial).
egress:
latency: 50ms
loss: 0.02 # percentage
bandwidth: 100mbit
```
##### Steps
- Pick the nodes to introduce the network anomly either from node_name or label_selector.
- Verify interface list in one of the node or use the interface with default route, as test interface, if no interface is specified by the user.
- Set traffic shaping config on node's interface using tc and netem.
- Wait for the duration time.
- Remove traffic shaping config on node's interface.
- Remove the job that spawned the pod.

View File

@@ -5,6 +5,7 @@ import logging
import kraken.invoke.command as runcommand
import sys
import re
import time
kraken_node_name = ""
@@ -12,9 +13,11 @@ kraken_node_name = ""
# Load kubeconfig and initialize kubernetes python client
def initialize_clients(kubeconfig_path):
global cli
global batch_cli
try:
config.load_kube_config(kubeconfig_path)
cli = client.CoreV1Api()
batch_cli = client.BatchV1Api()
except ApiException as e:
logging.error("Failed to initialize kubernetes client: %s\n" % e)
sys.exit(1)
@@ -164,6 +167,45 @@ def exec_cmd_in_pod(command, pod_name, namespace, container=None):
return ret
def delete_pod(name, namespace):
try:
cli.delete_namespaced_pod(name=name, namespace=namespace)
while cli.read_namespaced_pod(name=name, namespace=namespace):
time.sleep(1)
except ApiException:
logging.info("Pod already deleted")
def create_pod(body, namespace, timeout=120):
try:
pod_stat = None
pod_stat = cli.create_namespaced_pod(body=body, namespace=namespace)
end_time = time.time() + timeout
while True:
pod_stat = cli.read_namespaced_pod(name=body["metadata"]["name"], namespace=namespace)
if pod_stat.status.phase == "Running":
break
if time.time() > end_time:
raise Exception("Starting pod failed")
time.sleep(1)
except Exception as e:
logging.error("Pod creation failed %s" % e)
if pod_stat:
logging.error(pod_stat.status.container_statuses)
delete_pod(body["metadata"]["name"], namespace)
sys.exit(1)
def read_pod(name, namespace="default"):
return cli.read_namespaced_pod(name=name, namespace=namespace)
def get_pod_log(name, namespace="default"):
return cli.read_namespaced_pod_log(
name=name, namespace=namespace, _return_http_data_only=True, _preload_content=False
)
def get_containers_in_pod(pod_name, namespace):
pod_info = cli.read_namespaced_pod(pod_name, namespace)
container_names = []
@@ -173,6 +215,64 @@ def get_containers_in_pod(pod_name, namespace):
return container_names
def delete_job(name, namespace="default"):
try:
api_response = batch_cli.delete_namespaced_job(
name=name,
namespace=namespace,
body=client.V1DeleteOptions(propagation_policy="Foreground", grace_period_seconds=0),
)
logging.debug("Job deleted. status='%s'" % str(api_response.status))
return api_response
except ApiException as api:
logging.warn(
"Exception when calling \
BatchV1Api->create_namespaced_job: %s"
% api
)
logging.warn("Job already deleted\n")
except Exception as e:
logging.error(
"Exception when calling \
BatchV1Api->delete_namespaced_job: %s\n"
% e
)
sys.exit(1)
def create_job(body, namespace="default"):
try:
api_response = batch_cli.create_namespaced_job(body=body, namespace=namespace)
return api_response
except ApiException as api:
logging.warn(
"Exception when calling \
BatchV1Api->create_job: %s"
% api
)
if api.status == 409:
logging.warn("Job already present")
except Exception as e:
logging.error(
"Exception when calling \
BatchV1Api->create_namespaced_job: %s"
% e
)
raise
def get_job_status(name, namespace="default"):
try:
return batch_cli.read_namespaced_job_status(name=name, namespace=namespace)
except Exception as e:
logging.error(
"Exception when calling \
BatchV1Api->read_namespaced_job_status: %s"
% e
)
raise
# Obtain node status
def get_node_status(node):
try:

View File

View File

@@ -0,0 +1,176 @@
import yaml
import logging
import time
import sys
import os
import random
from jinja2 import Environment, FileSystemLoader
import kraken.cerberus.setup as cerberus
import kraken.kubernetes.client as kubecli
import kraken.node_actions.common_node_functions as common_node_functions
# Reads the scenario config and introduces traffic variations in Node's host network interface.
def run(scenarios_list, config, wait_duration):
failed_post_scenarios = ""
logging.info("Runing the Network Chaos tests")
for net_config in scenarios_list:
with open(net_config, "r") as file:
param_lst = ["latency", "loss", "bandwidth"]
test_config = yaml.safe_load(file)
test_dict = test_config["network_chaos"]
test_duration = int(test_dict.get("duration", 300))
test_interface = test_dict.get("interfaces", [])
test_node = test_dict.get("node_name", "")
test_node_label = test_dict.get("label_selector", "node-role.kubernetes.io/master")
test_execution = test_dict.get("execution", "serial")
test_instance_count = test_dict.get("instance_count", 1)
test_egress = test_dict.get("egress", {"bandwidth": "100mbit"})
if test_node:
node_name_list = test_node.split(",")
else:
node_name_list = [test_node]
nodelst = []
for single_node_name in node_name_list:
nodelst.extend(common_node_functions.get_node(single_node_name, test_node_label, test_instance_count))
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
env = Environment(loader=file_loader)
pod_template = env.get_template("pod.j2")
test_interface = verify_interface(test_interface, nodelst, pod_template)
joblst = []
egress_lst = [i for i in param_lst if i in test_egress]
chaos_config = {
"network_chaos": {
"duration": test_duration,
"interfaces": test_interface,
"node_name": ",".join(nodelst),
"execution": test_execution,
"instance_count": test_instance_count,
"egress": test_egress,
}
}
logging.info("Executing network chaos with config \n %s" % yaml.dump(chaos_config))
job_template = env.get_template("job.j2")
try:
for i in egress_lst:
for node in nodelst:
exec_cmd = get_egress_cmd(
test_execution, test_interface, i, test_dict["egress"], duration=test_duration
)
logging.info("Executing %s on node %s" % (exec_cmd, node))
job_body = yaml.safe_load(
job_template.render(jobname=i + str(hash(node))[:5], nodename=node, cmd=exec_cmd)
)
joblst.append(job_body["metadata"]["name"])
api_response = kubecli.create_job(job_body)
if api_response is None:
raise Exception("Error creating job")
if test_execution == "serial":
logging.info("Waiting for serial job to finish")
start_time = int(time.time())
wait_for_job(joblst[:], test_duration + 300)
logging.info("Waiting for wait_duration %s" % wait_duration)
time.sleep(wait_duration)
end_time = int(time.time())
cerberus.publish_kraken_status(config, failed_post_scenarios, start_time, end_time)
if test_execution == "parallel":
break
if test_execution == "parallel":
logging.info("Waiting for parallel job to finish")
start_time = int(time.time())
wait_for_job(joblst[:], test_duration + 300)
logging.info("Waiting for wait_duration %s" % wait_duration)
time.sleep(wait_duration)
end_time = int(time.time())
cerberus.publish_kraken_status(config, failed_post_scenarios, start_time, end_time)
except Exception as e:
logging.error("Network Chaos exiting due to Exception %s" % e)
sys.exit(1)
finally:
logging.info("Deleting jobs")
delete_job(joblst[:])
def verify_interface(test_interface, nodelst, template):
pod_index = random.randint(0, len(nodelst) - 1)
pod_body = yaml.safe_load(template.render(nodename=nodelst[pod_index]))
logging.info("Creating pod to query interface on node %s" % nodelst[pod_index])
kubecli.create_pod(pod_body, "default", 300)
try:
if test_interface == []:
cmd = "ip r | grep default | awk '/default/ {print $5}'"
output = kubecli.exec_cmd_in_pod(cmd, "fedtools", "default")
test_interface = [output.replace("\n", "")]
else:
cmd = "ip -br addr show|awk -v ORS=',' '{print $1}'"
output = kubecli.exec_cmd_in_pod(cmd, "fedtools", "default")
interface_lst = output[:-1].split(",")
for interface in test_interface:
if interface not in interface_lst:
logging.error(
"Interface %s not found in node %s interface list %s" % (interface, nodelst[pod_index]),
interface_lst,
)
sys.exit(1)
return test_interface
finally:
logging.info("Deleteing pod to query interface on node")
kubecli.delete_pod("fedtools", "default")
def get_job_pods(api_response):
controllerUid = api_response.metadata.labels["controller-uid"]
pod_label_selector = "controller-uid=" + controllerUid
pods_list = kubecli.list_pods(label_selector=pod_label_selector, namespace="default")
return pods_list[0]
def wait_for_job(joblst, timeout=300):
waittime = time.time() + timeout
count = 0
joblen = len(joblst)
while count != joblen:
for jobname in joblst:
try:
api_response = kubecli.get_job_status(jobname, namespace="default")
if api_response.status.succeeded is not None or api_response.status.failed is not None:
count += 1
joblst.remove(jobname)
except Exception:
logging.warn("Exception in getting job status")
if time.time() > waittime:
raise Exception("Starting pod failed")
time.sleep(5)
def delete_job(joblst):
for jobname in joblst:
try:
api_response = kubecli.get_job_status(jobname, namespace="default")
if api_response.status.failed is not None:
pod_name = get_job_pods(api_response)
pod_stat = kubecli.read_pod(name=pod_name, namespace="default")
logging.error(pod_stat.status.container_statuses)
pod_log_response = kubecli.get_pod_log(name=pod_name, namespace="default")
pod_log = pod_log_response.data.decode("utf-8")
logging.error(pod_log)
except Exception:
logging.warn("Exception in getting job status")
api_response = kubecli.delete_job(name=jobname, namespace="default")
def get_egress_cmd(execution, test_interface, mod, vallst, duration=30):
tc_set = tc_unset = tc_ls = ""
param_map = {"latency": "delay", "loss": "loss", "bandwidth": "rate"}
for i in test_interface:
tc_set = "{0} tc qdisc add dev {1} root netem".format(tc_set, i)
tc_unset = "{0} tc qdisc del dev {1} root ;".format(tc_unset, i)
tc_ls = "{0} tc qdisc ls dev {1} ;".format(tc_ls, i)
if execution == "parallel":
for val in vallst.keys():
tc_set += " {0} {1} ".format(param_map[val], vallst[val])
tc_set += ";"
else:
tc_set += " {0} {1} ;".format(param_map[mod], vallst[mod])
exec_cmd = "{0} {1} sleep {2};{3} sleep 20;{4}".format(tc_set, tc_ls, duration, tc_unset, tc_ls)
return exec_cmd

View File

@@ -0,0 +1,25 @@
apiVersion: batch/v1
kind: Job
metadata:
name: chaos-{{jobname}}
spec:
template:
spec:
nodeName: {{nodename}}
hostNetwork: true
containers:
- name: networkchaos
image: docker.io/fedora/tools
command: ["/bin/sh", "-c", "{{cmd}}"]
securityContext:
privileged: true
volumeMounts:
- mountPath: /lib/modules
name: lib-modules
readOnly: true
volumes:
- name: lib-modules
hostPath:
path: /lib/modules
restartPolicy: Never
backoffLimit: 0

View File

@@ -0,0 +1,17 @@
apiVersion: v1
kind: Pod
metadata:
name: fedtools
spec:
hostNetwork: true
nodeName: {{nodename}}
containers:
- name: fedtools
image: docker.io/fedora/tools
command:
- /bin/sh
- -c
- |
sleep infinity
securityContext:
privileged: true

View File

@@ -21,6 +21,7 @@ import kraken.kube_burner.client as kube_burner
import kraken.zone_outage.actions as zone_outages
import kraken.application_outage.actions as application_outage
import kraken.pvc.pvc_scenario as pvc_scenario
import kraken.network_chaos.actions as network_chaos
import server as server
@@ -217,6 +218,11 @@ def main(cfg):
logging.info("Running PVC scenario")
pvc_scenario.run(scenarios_list, config)
# Network scenarios
elif scenario_type == "network_chaos":
logging.info("Running Network Chaos")
network_chaos.run(scenarios_list, config, wait_duration)
iteration += 1
logging.info("")

View File

@@ -0,0 +1,12 @@
network_chaos: # Scenario to create an outage by simulating random variations in the network.
duration: 300 # seconds
node_name: # node on which scenario has to be injected;
label_selector: <label_selector> # when node_name is not specified, a node with matching label_selector is selected for running the scenario.
instance_count: 1
interfaces: # Interface name would be the Kernel host network interface name.
- "<interface_name>"
execution: serial
egress:
latency: 50ms # 50ms
loss: 0.02 # percentage
bandwidth: 100mbit