removing kubernetes functions (#1205)

Signed-off-by: Paige Patton <prubenda@redhat.com>
This commit is contained in:
Paige Patton
2026-03-31 07:46:40 -05:00
committed by GitHub
parent 8c57b0956b
commit 626e203d33
5 changed files with 278 additions and 909 deletions

View File

@@ -8,11 +8,9 @@ import re
import random
from traceback import format_exc
from jinja2 import Environment, FileSystemLoader
from . import kubernetes_functions as kube_helper
from krkn_lib.k8s import KrknKubernetes
import typing
from arcaflow_plugin_sdk import validation, plugin
from kubernetes.client.api.core_v1_api import CoreV1Api as CoreV1Api
from kubernetes.client.api.batch_v1_api import BatchV1Api as BatchV1Api
@dataclass
@@ -150,7 +148,7 @@ class NetworkScenarioErrorOutput:
)
def get_default_interface(node: str, pod_template, cli: CoreV1Api, image: str) -> str:
def get_default_interface(node: str, pod_template, kubecli: KrknKubernetes, image: str) -> str:
"""
Function that returns a random interface from a node
@@ -162,20 +160,20 @@ def get_default_interface(node: str, pod_template, cli: CoreV1Api, image: str) -
- The YAML template used to instantiate a pod to query
the node's interface
cli (CoreV1Api)
- Object to interact with Kubernetes Python client's CoreV1 API
kubecli (KrknKubernetes)
- Object to interact with Kubernetes Python client
Returns:
Default interface (string) belonging to the node
"""
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex,nodename=node, image=image))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex, nodename=node, image=image))
logging.info("Creating pod to query interface on node %s" % node)
kube_helper.create_pod(cli, pod_body, "default", 300)
kubecli.create_pod(pod_body, "default", 300)
pod_name = f"fedtools-{pod_name_regex}"
try:
cmd = ["ip", "r"]
output = kube_helper.exec_cmd_in_pod(cli, cmd, pod_name, "default")
output = kubecli.exec_cmd_in_pod(cmd, pod_name, "default")
if not output:
logging.error("Exception occurred while executing command in pod")
@@ -191,13 +189,13 @@ def get_default_interface(node: str, pod_template, cli: CoreV1Api, image: str) -
finally:
logging.info("Deleting pod to query interface on node")
kube_helper.delete_pod(cli, pod_name, "default")
kubecli.delete_pod(pod_name, "default")
return interfaces
def verify_interface(
input_interface_list: typing.List[str], node: str, pod_template, cli: CoreV1Api, image: str
input_interface_list: typing.List[str], node: str, pod_template, kubecli: KrknKubernetes, image: str
) -> typing.List[str]:
"""
Function that verifies whether a list of interfaces is present in the node.
@@ -214,21 +212,21 @@ def verify_interface(
- The YAML template used to instantiate a pod to query
the node's interfaces
cli (CoreV1Api)
- Object to interact with Kubernetes Python client's CoreV1 API
kubecli (KrknKubernetes)
- Object to interact with Kubernetes Python client
Returns:
The interface list for the node
"""
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex,nodename=node, image=image))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex, nodename=node, image=image))
logging.info("Creating pod to query interface on node %s" % node)
kube_helper.create_pod(cli, pod_body, "default", 300)
kubecli.create_pod(pod_body, "default", 300)
pod_name = f"fedtools-{pod_name_regex}"
try:
if input_interface_list == []:
cmd = ["ip", "r"]
output = kube_helper.exec_cmd_in_pod(cli, cmd, pod_name, "default")
output = kubecli.exec_cmd_in_pod(cmd, pod_name, "default")
if not output:
logging.error("Exception occurred while executing command in pod")
@@ -244,7 +242,7 @@ def verify_interface(
else:
cmd = ["ip", "-br", "addr", "show"]
output = kube_helper.exec_cmd_in_pod(cli, cmd, pod_name, "default")
output = kubecli.exec_cmd_in_pod(cmd, pod_name, "default")
if not output:
logging.error("Exception occurred while executing command in pod")
@@ -267,7 +265,7 @@ def verify_interface(
)
finally:
logging.info("Deleting pod to query interface on node")
kube_helper.delete_pod(cli, pod_name, "default")
kubecli.delete_pod(pod_name, "default")
return input_interface_list
@@ -277,7 +275,7 @@ def get_node_interfaces(
label_selector: str,
instance_count: int,
pod_template,
cli: CoreV1Api,
kubecli: KrknKubernetes,
image: str
) -> typing.Dict[str, typing.List[str]]:
"""
@@ -305,8 +303,8 @@ def get_node_interfaces(
- The YAML template used to instantiate a pod to query
the node's interfaces
cli (CoreV1Api)
- Object to interact with Kubernetes Python client's CoreV1 API
kubecli (KrknKubernetes)
- Object to interact with Kubernetes Python client
Returns:
Filtered dictionary containing the test nodes and their test interfaces
@@ -317,22 +315,22 @@ def get_node_interfaces(
"If node names and interfaces aren't provided, "
"then the label selector must be provided"
)
nodes = kube_helper.get_node(None, label_selector, instance_count, cli)
nodes = kubecli.get_node(None, label_selector, instance_count)
node_interface_dict = {}
for node in nodes:
node_interface_dict[node] = get_default_interface(node, pod_template, cli, image)
node_interface_dict[node] = get_default_interface(node, pod_template, kubecli, image)
else:
node_name_list = node_interface_dict.keys()
filtered_node_list = []
for node in node_name_list:
filtered_node_list.extend(
kube_helper.get_node(node, label_selector, instance_count, cli)
kubecli.get_node(node, label_selector, instance_count)
)
for node in filtered_node_list:
node_interface_dict[node] = verify_interface(
node_interface_dict[node], node, pod_template, cli, image
node_interface_dict[node], node, pod_template, kubecli, image
)
return node_interface_dict
@@ -344,11 +342,10 @@ def apply_ingress_filter(
node: str,
pod_template,
job_template,
batch_cli: BatchV1Api,
cli: CoreV1Api,
kubecli: KrknKubernetes,
create_interfaces: bool = True,
param_selector: str = "all",
image:str = "quay.io/krkn-chaos/krkn:tools",
image: str = "quay.io/krkn-chaos/krkn:tools",
) -> str:
"""
Function that applies the filters to shape incoming traffic to
@@ -374,11 +371,8 @@ def apply_ingress_filter(
- The YAML template used to instantiate a job to apply and remove
the filters on the interfaces
batch_cli
- Object to interact with Kubernetes Python client's BatchV1 API
cli (CoreV1Api)
- Object to interact with Kubernetes Python client's CoreV1 API
kubecli (KrknKubernetes)
- Object to interact with Kubernetes Python client
param_selector (string)
- Used to specify what kind of filter to apply. Useful during
@@ -394,7 +388,7 @@ def apply_ingress_filter(
network_params = {param_selector: cfg.network_params[param_selector]}
if create_interfaces:
create_virtual_interfaces(cli, interface_list, node, pod_template, image)
create_virtual_interfaces(kubecli, interface_list, node, pod_template, image)
exec_cmd = get_ingress_cmd(
interface_list, network_params, duration=cfg.test_duration
@@ -403,7 +397,7 @@ def apply_ingress_filter(
job_body = yaml.safe_load(
job_template.render(jobname=str(hash(node))[:5], nodename=node, image=image, cmd=exec_cmd)
)
api_response = kube_helper.create_job(batch_cli, job_body)
api_response = kubecli.create_job(job_body)
if api_response is None:
raise Exception("Error creating job")
@@ -412,15 +406,15 @@ def apply_ingress_filter(
def create_virtual_interfaces(
cli: CoreV1Api, interface_list: typing.List[str], node: str, pod_template, image: str
kubecli: KrknKubernetes, interface_list: typing.List[str], node: str, pod_template, image: str
) -> None:
"""
Function that creates a privileged pod and uses it to create
virtual interfaces on the node
Args:
cli (CoreV1Api)
- Object to interact with Kubernetes Python client's CoreV1 API
kubecli (KrknKubernetes)
- Object to interact with Kubernetes Python client
interface_list (List of strings)
- The list of interfaces on the node for which virtual interfaces
@@ -434,37 +428,34 @@ def create_virtual_interfaces(
virtual interfaces on the node
"""
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex,nodename=node, image=image))
kube_helper.create_pod(cli, pod_body, "default", 300)
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex, nodename=node, image=image))
kubecli.create_pod(pod_body, "default", 300)
logging.info(
"Creating {0} virtual interfaces on node {1} using a pod".format(
len(interface_list), node
)
)
pod_name = f"modtools-{pod_name_regex}"
create_ifb(cli, len(interface_list), pod_name)
create_ifb(kubecli, len(interface_list), pod_name)
logging.info("Deleting pod used to create virtual interfaces")
kube_helper.delete_pod(cli, pod_name, "default")
kubecli.delete_pod(pod_name, "default")
def delete_virtual_interfaces(
cli: CoreV1Api, node_list: typing.List[str], pod_template, image: str
kubecli: KrknKubernetes, node_list: typing.List[str], pod_template, image: str
):
"""
Function that creates a privileged pod and uses it to delete all
virtual interfaces on the specified nodes
Args:
cli (CoreV1Api)
- Object to interact with Kubernetes Python client's CoreV1 API
kubecli (KrknKubernetes)
- Object to interact with Kubernetes Python client
node_list (List of strings)
- The list of nodes on which the list of virtual interfaces are
to be deleted
node (string)
- The node on which the virtual interfaces are created
pod_template (jinja2.environment.Template))
- The YAML template used to instantiate a pod to delete
virtual interfaces on the node
@@ -472,46 +463,45 @@ def delete_virtual_interfaces(
for node in node_list:
pod_name_regex = str(random.randint(0, 10000))
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex,nodename=node, image=image))
kube_helper.create_pod(cli, pod_body, "default", 300)
pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex, nodename=node, image=image))
kubecli.create_pod(pod_body, "default", 300)
logging.info("Deleting all virtual interfaces on node {0}".format(node))
pod_name = f"modtools-{pod_name_regex}"
delete_ifb(cli, pod_name)
kube_helper.delete_pod(cli, pod_name, "default")
delete_ifb(kubecli, pod_name)
kubecli.delete_pod(pod_name, "default")
def create_ifb(cli: CoreV1Api, number: int, pod_name: str):
def create_ifb(kubecli: KrknKubernetes, number: int, pod_name: str):
"""
Function that creates virtual interfaces in a pod.
Makes use of modprobe commands
"""
exec_command = ["chroot", "/host", "modprobe", "ifb", "numifbs=" + str(number)]
kube_helper.exec_cmd_in_pod(cli, exec_command, pod_name, "default")
exec_command = ["/host", "modprobe", "ifb", "numifbs=" + str(number)]
kubecli.exec_cmd_in_pod(exec_command, pod_name, "default", base_command="chroot")
for i in range(0, number):
exec_command = ["chroot", "/host", "ip", "link", "set", "dev"]
exec_command += ["ifb" + str(i), "up"]
kube_helper.exec_cmd_in_pod(cli, exec_command, pod_name, "default")
exec_command = ["/host", "ip", "link", "set", "dev", "ifb" + str(i), "up"]
kubecli.exec_cmd_in_pod(exec_command, pod_name, "default", base_command="chroot")
def delete_ifb(cli: CoreV1Api, pod_name: str):
def delete_ifb(kubecli: KrknKubernetes, pod_name: str):
"""
Function that deletes all virtual interfaces in a pod.
Makes use of modprobe command
"""
exec_command = ["chroot", "/host", "modprobe", "-r", "ifb"]
kube_helper.exec_cmd_in_pod(cli, exec_command, pod_name, "default")
exec_command = ["/host", "modprobe", "-r", "ifb"]
kubecli.exec_cmd_in_pod(exec_command, pod_name, "default", base_command="chroot")
def get_job_pods(cli: CoreV1Api, api_response):
def get_job_pods(kubecli: KrknKubernetes, api_response):
"""
Function that gets the pod corresponding to the job
Args:
cli (CoreV1Api)
- Object to interact with Kubernetes Python client's CoreV1 API
kubecli (KrknKubernetes)
- Object to interact with Kubernetes Python client
api_response
- The API response for the job status
@@ -522,22 +512,22 @@ def get_job_pods(cli: CoreV1Api, api_response):
controllerUid = api_response.metadata.labels["controller-uid"]
pod_label_selector = "controller-uid=" + controllerUid
pods_list = kube_helper.list_pods(
cli, label_selector=pod_label_selector, namespace="default"
pods_list = kubecli.list_pods(
label_selector=pod_label_selector, namespace="default"
)
return pods_list[0]
def wait_for_job(
batch_cli: BatchV1Api, job_list: typing.List[str], timeout: int = 300
kubecli: KrknKubernetes, job_list: typing.List[str], timeout: int = 300
) -> None:
"""
Function that waits for a list of jobs to finish within a time period
Args:
batch_cli (BatchV1Api)
- Object to interact with Kubernetes Python client's BatchV1 API
kubecli (KrknKubernetes)
- Object to interact with Kubernetes Python client
job_list (List of strings)
- The list of jobs to check for completion
@@ -552,9 +542,7 @@ def wait_for_job(
while count != job_len:
for job_name in job_list:
try:
api_response = kube_helper.get_job_status(
batch_cli, job_name, namespace="default"
)
api_response = kubecli.get_job_status(job_name, namespace="default")
if (
api_response.status.succeeded is not None
or api_response.status.failed is not None
@@ -571,16 +559,13 @@ def wait_for_job(
time.sleep(5)
def delete_jobs(cli: CoreV1Api, batch_cli: BatchV1Api, job_list: typing.List[str]):
def delete_jobs(kubecli: KrknKubernetes, job_list: typing.List[str]):
"""
Function that deletes jobs
Args:
cli (CoreV1Api)
- Object to interact with Kubernetes Python client's CoreV1 API
batch_cli (BatchV1Api)
- Object to interact with Kubernetes Python client's BatchV1 API
kubecli (KrknKubernetes)
- Object to interact with Kubernetes Python client
job_list (List of strings)
- The list of jobs to delete
@@ -588,23 +573,19 @@ def delete_jobs(cli: CoreV1Api, batch_cli: BatchV1Api, job_list: typing.List[str
for job_name in job_list:
try:
api_response = kube_helper.get_job_status(
batch_cli, job_name, namespace="default"
)
api_response = kubecli.get_job_status(job_name, namespace="default")
if api_response.status.failed is not None:
pod_name = get_job_pods(cli, api_response)
pod_stat = kube_helper.read_pod(cli, name=pod_name, namespace="default")
pod_name = get_job_pods(kubecli, api_response)
pod_stat = kubecli.read_pod(name=pod_name, namespace="default")
logging.error(pod_stat.status.container_statuses)
pod_log_response = kube_helper.get_pod_log(
cli, name=pod_name, namespace="default"
pod_log_response = kubecli.get_pod_log(
name=pod_name, namespace="default"
)
pod_log = pod_log_response.data.decode("utf-8")
logging.error(pod_log)
except Exception as e:
logging.warning("Exception in getting job status: %s" % str(e))
api_response = kube_helper.delete_job(
batch_cli, name=job_name, namespace="default"
)
kubecli.delete_job(name=job_name, namespace="default")
def get_ingress_cmd(
@@ -715,7 +696,7 @@ def network_chaos(
job_template = env.get_template("job.j2")
pod_interface_template = env.get_template("pod_interface.j2")
pod_module_template = env.get_template("pod_module.j2")
cli, batch_cli = kube_helper.setup_kubernetes(cfg.kubeconfig_path)
kubecli = KrknKubernetes(kubeconfig_path=cfg.kubeconfig_path)
test_image = cfg.image
logging.info("Starting Ingress Network Chaos")
try:
@@ -724,7 +705,7 @@ def network_chaos(
cfg.label_selector,
cfg.instance_count,
pod_interface_template,
cli,
kubecli,
test_image
)
except Exception:
@@ -741,13 +722,12 @@ def network_chaos(
node,
pod_module_template,
job_template,
batch_cli,
cli,
test_image
kubecli,
image=test_image
)
)
logging.info("Waiting for parallel job to finish")
wait_for_job(batch_cli, job_list[:], cfg.test_duration + 100)
wait_for_job(kubecli, job_list[:], cfg.test_duration + 100)
elif cfg.execution_type == "serial":
create_interfaces = True
@@ -760,22 +740,20 @@ def network_chaos(
node,
pod_module_template,
job_template,
batch_cli,
cli,
kubecli,
create_interfaces=create_interfaces,
param_selector=param,
image=test_image
)
)
logging.info("Waiting for serial job to finish")
wait_for_job(batch_cli, job_list[:], cfg.test_duration + 100)
wait_for_job(kubecli, job_list[:], cfg.test_duration + 100)
logging.info("Deleting jobs")
delete_jobs(cli, batch_cli, job_list[:])
delete_jobs(kubecli, job_list[:])
job_list = []
create_interfaces = False
else:
return "error", NetworkScenarioErrorOutput(
"Invalid execution type - serial and parallel are "
"the only accepted types"
@@ -790,6 +768,6 @@ def network_chaos(
logging.error("Ingress Network Chaos exiting due to Exception - %s" % e)
return "error", NetworkScenarioErrorOutput(format_exc())
finally:
delete_virtual_interfaces(cli, node_interface_dict.keys(), pod_module_template, test_image)
delete_virtual_interfaces(kubecli, node_interface_dict.keys(), pod_module_template, test_image)
logging.info("Deleting jobs(if any)")
delete_jobs(cli, batch_cli, job_list[:])
delete_jobs(kubecli, job_list[:])

View File

@@ -1,284 +0,0 @@
from kubernetes import config, client
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream
import sys
import time
import logging
import random
def setup_kubernetes(kubeconfig_path):
"""
Sets up the Kubernetes client
"""
if kubeconfig_path is None:
kubeconfig_path = config.KUBE_CONFIG_DEFAULT_LOCATION
config.load_kube_config(kubeconfig_path)
cli = client.CoreV1Api()
batch_cli = client.BatchV1Api()
return cli, batch_cli
def create_job(batch_cli, body, namespace="default"):
"""
Function used to create a job from a YAML config
"""
try:
api_response = batch_cli.create_namespaced_job(body=body, namespace=namespace)
return api_response
except ApiException as api:
logging.warning(
"Exception when calling \
BatchV1Api->create_job: %s"
% api
)
if api.status == 409:
logging.warning("Job already present")
except Exception as e:
logging.error(
"Exception when calling \
BatchV1Api->create_namespaced_job: %s"
% e
)
raise
def delete_pod(cli, name, namespace):
"""
Function that deletes a pod and waits until deletion is complete
"""
try:
cli.delete_namespaced_pod(name=name, namespace=namespace)
while cli.read_namespaced_pod(name=name, namespace=namespace):
time.sleep(1)
except ApiException as e:
if e.status == 404:
logging.info("Pod deleted")
else:
logging.error("Failed to delete pod %s" % e)
raise e
def create_pod(cli, body, namespace, timeout=120):
"""
Function used to create a pod from a YAML config
"""
try:
pod_stat = None
pod_stat = cli.create_namespaced_pod(body=body, namespace=namespace)
end_time = time.time() + timeout
while True:
pod_stat = cli.read_namespaced_pod(name=body["metadata"]["name"], namespace=namespace)
if pod_stat.status.phase == "Running":
break
if time.time() > end_time:
raise Exception("Starting pod failed")
time.sleep(1)
except Exception as e:
logging.error("Pod creation failed %s" % e)
if pod_stat:
logging.error(pod_stat.status.container_statuses)
delete_pod(cli, body["metadata"]["name"], namespace)
sys.exit(1)
def exec_cmd_in_pod(cli, command, pod_name, namespace, container=None):
"""
Function used to execute a command in a running pod
"""
exec_command = command
try:
if container:
ret = stream(
cli.connect_get_namespaced_pod_exec,
pod_name,
namespace,
container=container,
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
else:
ret = stream(
cli.connect_get_namespaced_pod_exec,
pod_name,
namespace,
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
except Exception as e:
return False
return ret
def create_ifb(cli, number, pod_name):
"""
Function that creates virtual interfaces in a pod. Makes use of modprobe commands
"""
exec_command = ['chroot', '/host', 'modprobe', 'ifb','numifbs=' + str(number)]
resp = exec_cmd_in_pod(cli, exec_command, pod_name, 'default')
for i in range(0, number):
exec_command = ['chroot', '/host','ip','link','set','dev']
exec_command+= ['ifb' + str(i), 'up']
resp = exec_cmd_in_pod(cli, exec_command, pod_name, 'default')
def delete_ifb(cli, pod_name):
"""
Function that deletes all virtual interfaces in a pod. Makes use of modprobe command
"""
exec_command = ['chroot', '/host', 'modprobe', '-r', 'ifb']
resp = exec_cmd_in_pod(cli, exec_command, pod_name, 'default')
def list_pods(cli, namespace, label_selector=None):
"""
Function used to list pods in a given namespace and having a certain label
"""
pods = []
try:
if label_selector:
ret = cli.list_namespaced_pod(namespace, pretty=True, label_selector=label_selector)
else:
ret = cli.list_namespaced_pod(namespace, pretty=True)
except ApiException as e:
logging.error(
"Exception when calling \
CoreV1Api->list_namespaced_pod: %s\n"
% e
)
raise e
for pod in ret.items:
pods.append(pod.metadata.name)
return pods
def get_job_status(batch_cli, name, namespace="default"):
"""
Function that retrieves the status of a running job in a given namespace
"""
try:
return batch_cli.read_namespaced_job_status(name=name, namespace=namespace)
except Exception as e:
logging.error(
"Exception when calling \
BatchV1Api->read_namespaced_job_status: %s"
% e
)
raise
def get_pod_log(cli, name, namespace="default"):
"""
Function that retrieves the logs of a running pod in a given namespace
"""
return cli.read_namespaced_pod_log(
name=name, namespace=namespace, _return_http_data_only=True, _preload_content=False
)
def read_pod(cli, name, namespace="default"):
"""
Function that retrieves the info of a running pod in a given namespace
"""
return cli.read_namespaced_pod(name=name, namespace=namespace)
def delete_job(batch_cli, name, namespace="default"):
"""
Deletes a job with the input name and namespace
"""
try:
api_response = batch_cli.delete_namespaced_job(
name=name,
namespace=namespace,
body=client.V1DeleteOptions(propagation_policy="Foreground", grace_period_seconds=0),
)
logging.debug("Job deleted. status='%s'" % str(api_response.status))
return api_response
except ApiException as api:
logging.warning(
"Exception when calling \
BatchV1Api->create_namespaced_job: %s"
% api
)
logging.warning("Job already deleted\n")
except Exception as e:
logging.error(
"Exception when calling \
BatchV1Api->delete_namespaced_job: %s\n"
% e
)
sys.exit(1)
def list_ready_nodes(cli, label_selector=None):
"""
Returns a list of ready nodes
"""
nodes = []
try:
if label_selector:
ret = cli.list_node(pretty=True, label_selector=label_selector)
else:
ret = cli.list_node(pretty=True)
except ApiException as e:
logging.error("Exception when calling CoreV1Api->list_node: %s\n" % e)
raise e
for node in ret.items:
for cond in node.status.conditions:
if str(cond.type) == "Ready" and str(cond.status) == "True":
nodes.append(node.metadata.name)
return nodes
def get_node(node_name, label_selector, instance_kill_count, cli):
"""
Returns active node(s) on which the scenario can be performed
"""
if node_name in list_ready_nodes(cli):
return [node_name]
elif node_name:
logging.info(
"Node with provided node_name does not exist or the node might "
"be in NotReady state."
)
nodes = list_ready_nodes(cli, label_selector)
if not nodes:
raise Exception("Ready nodes with the provided label selector do not exist")
logging.info(
"Ready nodes with the label selector %s: %s" % (label_selector, nodes)
)
number_of_nodes = len(nodes)
if instance_kill_count == number_of_nodes:
return nodes
nodes_to_return = []
for i in range(instance_kill_count):
node_to_add = nodes[random.randint(0, len(nodes) - 1)]
nodes_to_return.append(node_to_add)
nodes.remove(node_to_add)
return nodes_to_return

View File

@@ -1,274 +0,0 @@
from kubernetes import config, client
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream
import sys
import time
import logging
import random
def setup_kubernetes(kubeconfig_path) -> client.ApiClient:
"""
Sets up the Kubernetes client
"""
if kubeconfig_path is None:
kubeconfig_path = config.KUBE_CONFIG_DEFAULT_LOCATION
client_config = config.load_kube_config(kubeconfig_path)
return client.ApiClient(client_config)
def create_job(batch_cli, body, namespace="default"):
"""
Function used to create a job from a YAML config
"""
try:
api_response = batch_cli.create_namespaced_job(body=body, namespace=namespace)
return api_response
except ApiException as api:
logging.warning(
"Exception when calling \
BatchV1Api->create_job: %s"
% api
)
if api.status == 409:
logging.warning("Job already present")
except Exception as e:
logging.error(
"Exception when calling \
BatchV1Api->create_namespaced_job: %s"
% e
)
raise
def delete_pod(cli, name, namespace):
"""
Function that deletes a pod and waits until deletion is complete
"""
try:
cli.delete_namespaced_pod(name=name, namespace=namespace)
while cli.read_namespaced_pod(name=name, namespace=namespace):
time.sleep(1)
except ApiException as e:
if e.status == 404:
logging.info("Pod deleted")
else:
logging.error("Failed to delete pod %s" % e)
raise e
def create_pod(cli, body, namespace, timeout=120):
"""
Function used to create a pod from a YAML config
"""
try:
pod_stat = None
pod_stat = cli.create_namespaced_pod(body=body, namespace=namespace)
end_time = time.time() + timeout
while True:
pod_stat = cli.read_namespaced_pod(
name=body["metadata"]["name"], namespace=namespace
)
if pod_stat.status.phase == "Running":
break
if time.time() > end_time:
raise Exception("Starting pod failed")
time.sleep(1)
except Exception as e:
logging.error("Pod creation failed %s" % e)
if pod_stat:
logging.error(pod_stat.status.container_statuses)
delete_pod(cli, body["metadata"]["name"], namespace)
sys.exit(1)
def exec_cmd_in_pod(cli, command, pod_name, namespace, container=None):
"""
Function used to execute a command in a running pod
"""
exec_command = command
try:
if container:
ret = stream(
cli.connect_get_namespaced_pod_exec,
pod_name,
namespace,
container=container,
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
else:
ret = stream(
cli.connect_get_namespaced_pod_exec,
pod_name,
namespace,
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
except BaseException:
return False
return ret
def list_pods(cli, namespace, label_selector=None, exclude_label=None):
"""
Function used to list pods in a given namespace and having a certain label and excluding pods with exclude_label
and excluding pods with exclude_label
"""
pods = []
try:
if label_selector:
ret = cli.list_namespaced_pod(
namespace, pretty=True, label_selector=label_selector
)
else:
ret = cli.list_namespaced_pod(namespace, pretty=True)
except ApiException as e:
logging.error(
"Exception when calling \
CoreV1Api->list_namespaced_pod: %s\n"
% e
)
raise e
for pod in ret.items:
# Skip pods with the exclude label if specified
if exclude_label and pod.metadata.labels:
exclude_key, exclude_value = exclude_label.split("=", 1)
if (
exclude_key in pod.metadata.labels
and pod.metadata.labels[exclude_key] == exclude_value
):
continue
pods.append(pod.metadata.name)
return pods
def get_job_status(batch_cli, name, namespace="default"):
"""
Function that retrieves the status of a running job in a given namespace
"""
try:
return batch_cli.read_namespaced_job_status(name=name, namespace=namespace)
except Exception as e:
logging.error(
"Exception when calling \
BatchV1Api->read_namespaced_job_status: %s"
% e
)
raise
def get_pod_log(cli, name, namespace="default"):
"""
Function that retrieves the logs of a running pod in a given namespace
"""
return cli.read_namespaced_pod_log(
name=name,
namespace=namespace,
_return_http_data_only=True,
_preload_content=False,
)
def read_pod(cli, name, namespace="default"):
"""
Function that retrieves the info of a running pod in a given namespace
"""
return cli.read_namespaced_pod(name=name, namespace=namespace)
def delete_job(batch_cli, name, namespace="default"):
"""
Deletes a job with the input name and namespace
"""
try:
api_response = batch_cli.delete_namespaced_job(
name=name,
namespace=namespace,
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=0
),
)
logging.debug("Job deleted. status='%s'" % str(api_response.status))
return api_response
except ApiException as api:
logging.warning(
"Exception when calling \
BatchV1Api->create_namespaced_job: %s"
% api
)
logging.warning("Job already deleted\n")
except Exception as e:
logging.error(
"Exception when calling \
BatchV1Api->delete_namespaced_job: %s\n"
% e
)
sys.exit(1)
def list_ready_nodes(cli, label_selector=None):
"""
Returns a list of ready nodes
"""
nodes = []
try:
if label_selector:
ret = cli.list_node(pretty=True, label_selector=label_selector)
else:
ret = cli.list_node(pretty=True)
except ApiException as e:
logging.error("Exception when calling CoreV1Api->list_node: %s\n" % e)
raise e
for node in ret.items:
for cond in node.status.conditions:
if str(cond.type) == "Ready" and str(cond.status) == "True":
nodes.append(node.metadata.name)
return nodes
def get_node(node_name, label_selector, instance_kill_count, cli):
"""
Returns active node(s) on which the scenario can be performed
"""
if node_name in list_ready_nodes(cli):
return [node_name]
elif node_name:
logging.info(
"Node with provided node_name does not exist or the node might "
"be in NotReady state."
)
nodes = list_ready_nodes(cli, label_selector)
if not nodes:
raise Exception("Ready nodes with the provided label selector do not exist")
logging.info("Ready nodes with the label selector %s: %s" % (label_selector, nodes))
number_of_nodes = len(nodes)
if instance_kill_count == number_of_nodes:
return nodes
nodes_to_return = []
for i in range(instance_kill_count):
node_to_add = nodes[random.randint(0, len(nodes) - 1)]
nodes_to_return.append(node_to_add)
nodes.remove(node_to_add)
return nodes_to_return

View File

@@ -1,3 +1,31 @@
#!/usr/bin/env python3
"""
Test suite for ingress network shaping plugin
This test suite covers the ingress_shaping module functions using mocks
to avoid needing actual Kubernetes infrastructure.
Test Coverage:
- Serialization of config and output objects
- Interface discovery: get_default_interface, verify_interface, get_node_interfaces
- Virtual interface management: create_ifb, delete_ifb, create_virtual_interfaces, delete_virtual_interfaces
- Job management: get_job_pods, wait_for_job, delete_jobs
- Traffic shaping command generation: get_ingress_cmd
- Filter application: apply_ingress_filter
- End-to-end network_chaos flows: parallel, serial, error cases
IMPORTANT: These tests use mocking and do NOT require any Kubernetes cluster.
All API calls are mocked via unittest.mock.
Usage:
python -m unittest tests/test_ingress_network_plugin.py -v
# Run with coverage
python -m coverage run -a -m unittest tests/test_ingress_network_plugin.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import Mock, patch
from arcaflow_plugin_sdk import plugin
@@ -40,168 +68,144 @@ class NetworkScenariosTest(unittest.TestCase):
self.fail,
)
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_get_default_interface(self, mock_kube_helper):
def test_get_default_interface(self):
"""Test getting default interface from a node"""
# Setup mocks
mock_cli = Mock()
mock_kubecli = Mock()
mock_pod_template = Mock()
mock_pod_template.render.return_value = "pod_yaml_content"
mock_kube_helper.create_pod.return_value = None
mock_kube_helper.exec_cmd_in_pod.return_value = (
mock_kubecli.create_pod.return_value = None
mock_kubecli.exec_cmd_in_pod.return_value = (
"default via 192.168.1.1 dev eth0 proto dhcp metric 100\n"
"172.17.0.0/16 dev docker0 proto kernel scope link src 172.17.0.1"
)
mock_kube_helper.delete_pod.return_value = None
mock_kubecli.delete_pod.return_value = None
# Test
result = ingress_shaping.get_default_interface(
node="test-node",
pod_template=mock_pod_template,
cli=mock_cli,
kubecli=mock_kubecli,
image="quay.io/krkn-chaos/krkn:tools"
)
# Assertions
self.assertEqual(result, ["eth0"])
mock_kube_helper.create_pod.assert_called_once()
mock_kube_helper.exec_cmd_in_pod.assert_called_once()
mock_kube_helper.delete_pod.assert_called_once()
mock_kubecli.create_pod.assert_called_once()
mock_kubecli.exec_cmd_in_pod.assert_called_once()
mock_kubecli.delete_pod.assert_called_once()
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_verify_interface_with_empty_list(self, mock_kube_helper):
def test_verify_interface_with_empty_list(self):
"""Test verifying interface when input list is empty"""
# Setup mocks
mock_cli = Mock()
mock_kubecli = Mock()
mock_pod_template = Mock()
mock_pod_template.render.return_value = "pod_yaml_content"
mock_kube_helper.create_pod.return_value = None
mock_kube_helper.exec_cmd_in_pod.return_value = (
mock_kubecli.create_pod.return_value = None
mock_kubecli.exec_cmd_in_pod.return_value = (
"default via 192.168.1.1 dev eth0 proto dhcp metric 100\n"
)
mock_kube_helper.delete_pod.return_value = None
mock_kubecli.delete_pod.return_value = None
# Test
result = ingress_shaping.verify_interface(
input_interface_list=[],
node="test-node",
pod_template=mock_pod_template,
cli=mock_cli,
kubecli=mock_kubecli,
image="quay.io/krkn-chaos/krkn:tools"
)
# Assertions
self.assertEqual(result, ["eth0"])
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_verify_interface_with_valid_interfaces(self, mock_kube_helper):
def test_verify_interface_with_valid_interfaces(self):
"""Test verifying interface with valid interface list"""
# Setup mocks
mock_cli = Mock()
mock_kubecli = Mock()
mock_pod_template = Mock()
mock_pod_template.render.return_value = "pod_yaml_content"
mock_kube_helper.create_pod.return_value = None
mock_kube_helper.exec_cmd_in_pod.return_value = (
mock_kubecli.create_pod.return_value = None
mock_kubecli.exec_cmd_in_pod.return_value = (
"eth0 UP 192.168.1.10/24\n"
"eth1 UP 10.0.0.5/24\n"
"lo UNKNOWN 127.0.0.1/8\n"
)
mock_kube_helper.delete_pod.return_value = None
mock_kubecli.delete_pod.return_value = None
# Test
result = ingress_shaping.verify_interface(
input_interface_list=["eth0", "eth1"],
node="test-node",
pod_template=mock_pod_template,
cli=mock_cli,
kubecli=mock_kubecli,
image="quay.io/krkn-chaos/krkn:tools"
)
# Assertions
self.assertEqual(result, ["eth0", "eth1"])
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_verify_interface_with_invalid_interface(self, mock_kube_helper):
def test_verify_interface_with_invalid_interface(self):
"""Test verifying interface with an interface that doesn't exist"""
# Setup mocks
mock_cli = Mock()
mock_kubecli = Mock()
mock_pod_template = Mock()
mock_pod_template.render.return_value = "pod_yaml_content"
mock_kube_helper.create_pod.return_value = None
mock_kube_helper.exec_cmd_in_pod.return_value = (
mock_kubecli.create_pod.return_value = None
mock_kubecli.exec_cmd_in_pod.return_value = (
"eth0 UP 192.168.1.10/24\n"
"lo UNKNOWN 127.0.0.1/8\n"
)
mock_kube_helper.delete_pod.return_value = None
mock_kubecli.delete_pod.return_value = None
# Test - should raise exception
with self.assertRaises(Exception) as context:
ingress_shaping.verify_interface(
input_interface_list=["eth0", "eth99"],
node="test-node",
pod_template=mock_pod_template,
cli=mock_cli,
kubecli=mock_kubecli,
image="quay.io/krkn-chaos/krkn:tools"
)
self.assertIn("Interface eth99 not found", str(context.exception))
@patch('krkn.scenario_plugins.native.network.ingress_shaping.get_default_interface')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_get_node_interfaces_with_label_selector(self, mock_kube_helper, mock_get_default_interface):
def test_get_node_interfaces_with_label_selector(self, mock_get_default_interface):
"""Test getting node interfaces using label selector"""
# Setup mocks
mock_cli = Mock()
mock_kubecli = Mock()
mock_pod_template = Mock()
mock_kube_helper.get_node.return_value = ["node1", "node2"]
mock_kubecli.get_node.return_value = ["node1", "node2"]
mock_get_default_interface.return_value = ["eth0"]
# Test
result = ingress_shaping.get_node_interfaces(
node_interface_dict=None,
label_selector="node-role.kubernetes.io/worker",
instance_count=2,
pod_template=mock_pod_template,
cli=mock_cli,
kubecli=mock_kubecli,
image="quay.io/krkn-chaos/krkn:tools"
)
# Assertions
self.assertEqual(result, {"node1": ["eth0"], "node2": ["eth0"]})
self.assertEqual(mock_get_default_interface.call_count, 2)
@patch('krkn.scenario_plugins.native.network.ingress_shaping.verify_interface')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_get_node_interfaces_with_node_dict(self, mock_kube_helper, mock_verify_interface):
def test_get_node_interfaces_with_node_dict(self, mock_verify_interface):
"""Test getting node interfaces with provided node interface dictionary"""
# Setup mocks
mock_cli = Mock()
mock_kubecli = Mock()
mock_pod_template = Mock()
mock_kube_helper.get_node.return_value = ["node1"]
mock_kubecli.get_node.return_value = ["node1"]
mock_verify_interface.return_value = ["eth0", "eth1"]
# Test
result = ingress_shaping.get_node_interfaces(
node_interface_dict={"node1": ["eth0", "eth1"]},
label_selector=None,
instance_count=1,
pod_template=mock_pod_template,
cli=mock_cli,
kubecli=mock_kubecli,
image="quay.io/krkn-chaos/krkn:tools"
)
# Assertions
self.assertEqual(result, {"node1": ["eth0", "eth1"]})
mock_verify_interface.assert_called_once()
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_get_node_interfaces_no_selector_no_dict(self, mock_kube_helper):
def test_get_node_interfaces_no_selector_no_dict(self):
"""Test that exception is raised when both node dict and label selector are missing"""
mock_cli = Mock()
mock_kubecli = Mock()
mock_pod_template = Mock()
with self.assertRaises(Exception) as context:
@@ -210,148 +214,124 @@ class NetworkScenariosTest(unittest.TestCase):
label_selector=None,
instance_count=1,
pod_template=mock_pod_template,
cli=mock_cli,
kubecli=mock_kubecli,
image="quay.io/krkn-chaos/krkn:tools"
)
self.assertIn("label selector must be provided", str(context.exception))
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_create_ifb(self, mock_kube_helper):
def test_create_ifb(self):
"""Test creating virtual interfaces"""
mock_cli = Mock()
mock_kube_helper.exec_cmd_in_pod.return_value = None
mock_kubecli = Mock()
mock_kubecli.exec_cmd_in_pod.return_value = None
# Test
ingress_shaping.create_ifb(cli=mock_cli, number=2, pod_name="test-pod")
ingress_shaping.create_ifb(kubecli=mock_kubecli, number=2, pod_name="test-pod")
# Assertions
# Should call modprobe once and ip link set for each interface
self.assertEqual(mock_kube_helper.exec_cmd_in_pod.call_count, 3)
self.assertEqual(mock_kubecli.exec_cmd_in_pod.call_count, 3)
# Verify modprobe call
first_call = mock_kube_helper.exec_cmd_in_pod.call_args_list[0]
self.assertIn("modprobe", first_call[0][1])
self.assertIn("numifbs=2", first_call[0][1])
# Verify modprobe call — cmd is now the first positional arg
first_call = mock_kubecli.exec_cmd_in_pod.call_args_list[0]
self.assertIn("modprobe", first_call[0][0])
self.assertIn("numifbs=2", first_call[0][0])
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_delete_ifb(self, mock_kube_helper):
def test_delete_ifb(self):
"""Test deleting virtual interfaces"""
mock_cli = Mock()
mock_kube_helper.exec_cmd_in_pod.return_value = None
mock_kubecli = Mock()
mock_kubecli.exec_cmd_in_pod.return_value = None
# Test
ingress_shaping.delete_ifb(cli=mock_cli, pod_name="test-pod")
ingress_shaping.delete_ifb(kubecli=mock_kubecli, pod_name="test-pod")
# Assertions
mock_kube_helper.exec_cmd_in_pod.assert_called_once()
call_args = mock_kube_helper.exec_cmd_in_pod.call_args[0][1]
mock_kubecli.exec_cmd_in_pod.assert_called_once()
# cmd is now the first positional arg
call_args = mock_kubecli.exec_cmd_in_pod.call_args[0][0]
self.assertIn("modprobe", call_args)
self.assertIn("-r", call_args)
self.assertIn("ifb", call_args)
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_get_job_pods(self, mock_kube_helper):
def test_get_job_pods(self):
"""Test getting pods associated with a job"""
mock_cli = Mock()
mock_kubecli = Mock()
mock_api_response = Mock()
mock_api_response.metadata.labels = {"controller-uid": "test-uid-123"}
mock_kubecli.list_pods.return_value = ["pod1", "pod2"]
mock_kube_helper.list_pods.return_value = ["pod1", "pod2"]
result = ingress_shaping.get_job_pods(kubecli=mock_kubecli, api_response=mock_api_response)
# Test
result = ingress_shaping.get_job_pods(cli=mock_cli, api_response=mock_api_response)
# Assertions
self.assertEqual(result, "pod1")
mock_kube_helper.list_pods.assert_called_once_with(
mock_cli,
mock_kubecli.list_pods.assert_called_once_with(
label_selector="controller-uid=test-uid-123",
namespace="default"
)
@patch('time.sleep', return_value=None)
@patch('time.time')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_wait_for_job_success(self, mock_kube_helper, mock_time, mock_sleep):
def test_wait_for_job_success(self, mock_time, mock_sleep):
"""Test waiting for jobs to complete successfully"""
mock_batch_cli = Mock()
mock_time.side_effect = [0, 10, 20] # Simulate time progression
mock_kubecli = Mock()
mock_time.side_effect = [0, 10, 20]
# First job succeeds
mock_response1 = Mock()
mock_response1.status.succeeded = 1
mock_response1.status.failed = None
# Second job succeeds
mock_response2 = Mock()
mock_response2.status.succeeded = 1
mock_response2.status.failed = None
mock_kube_helper.get_job_status.side_effect = [mock_response1, mock_response2]
mock_kubecli.get_job_status.side_effect = [mock_response1, mock_response2]
# Test
ingress_shaping.wait_for_job(
batch_cli=mock_batch_cli,
kubecli=mock_kubecli,
job_list=["job1", "job2"],
timeout=300
)
# Assertions
self.assertEqual(mock_kube_helper.get_job_status.call_count, 2)
self.assertEqual(mock_kubecli.get_job_status.call_count, 2)
@patch('time.sleep', return_value=None)
@patch('time.time')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_wait_for_job_timeout(self, mock_kube_helper, mock_time, mock_sleep):
def test_wait_for_job_timeout(self, mock_time, mock_sleep):
"""Test waiting for jobs times out"""
mock_batch_cli = Mock()
mock_time.side_effect = [0, 350] # Simulate timeout
mock_kubecli = Mock()
mock_time.side_effect = [0, 350]
mock_response = Mock()
mock_response.status.succeeded = None
mock_response.status.failed = None
mock_kube_helper.get_job_status.return_value = mock_response
mock_kubecli.get_job_status.return_value = mock_response
# Test - should raise exception
with self.assertRaises(Exception) as context:
ingress_shaping.wait_for_job(
batch_cli=mock_batch_cli,
kubecli=mock_kubecli,
job_list=["job1"],
timeout=300
)
self.assertIn("timeout", str(context.exception))
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_delete_jobs(self, mock_kube_helper):
def test_delete_jobs(self):
"""Test deleting jobs"""
mock_cli = Mock()
mock_batch_cli = Mock()
mock_kubecli = Mock()
mock_response = Mock()
mock_response.status.failed = None
mock_kube_helper.get_job_status.return_value = mock_response
mock_kube_helper.delete_job.return_value = None
mock_kubecli.get_job_status.return_value = mock_response
mock_kubecli.delete_job.return_value = None
# Test
ingress_shaping.delete_jobs(
cli=mock_cli,
batch_cli=mock_batch_cli,
kubecli=mock_kubecli,
job_list=["job1", "job2"]
)
# Assertions
self.assertEqual(mock_kube_helper.get_job_status.call_count, 2)
self.assertEqual(mock_kube_helper.delete_job.call_count, 2)
self.assertEqual(mock_kubecli.get_job_status.call_count, 2)
self.assertEqual(mock_kubecli.delete_job.call_count, 2)
@patch('krkn.scenario_plugins.native.network.ingress_shaping.get_job_pods')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_delete_jobs_with_failed_job(self, mock_kube_helper, mock_get_job_pods):
def test_delete_jobs_with_failed_job(self, mock_get_job_pods):
"""Test deleting jobs when one has failed"""
mock_cli = Mock()
mock_batch_cli = Mock()
mock_kubecli = Mock()
mock_response = Mock()
mock_response.status.failed = 1
@@ -362,22 +342,19 @@ class NetworkScenariosTest(unittest.TestCase):
mock_log_response = Mock()
mock_log_response.data.decode.return_value = "Error log content"
mock_kube_helper.get_job_status.return_value = mock_response
mock_kubecli.get_job_status.return_value = mock_response
mock_get_job_pods.return_value = "failed-pod"
mock_kube_helper.read_pod.return_value = mock_pod_status
mock_kube_helper.get_pod_log.return_value = mock_log_response
mock_kube_helper.delete_job.return_value = None
mock_kubecli.read_pod.return_value = mock_pod_status
mock_kubecli.get_pod_log.return_value = mock_log_response
mock_kubecli.delete_job.return_value = None
# Test
ingress_shaping.delete_jobs(
cli=mock_cli,
batch_cli=mock_batch_cli,
kubecli=mock_kubecli,
job_list=["failed-job"]
)
# Assertions
mock_kube_helper.read_pod.assert_called_once()
mock_kube_helper.get_pod_log.assert_called_once()
mock_kubecli.read_pod.assert_called_once()
mock_kubecli.get_pod_log.assert_called_once()
def test_get_ingress_cmd_basic(self):
"""Test generating ingress traffic shaping commands"""
@@ -387,7 +364,6 @@ class NetworkScenariosTest(unittest.TestCase):
duration=120
)
# Assertions
self.assertIn("tc qdisc add dev eth0 handle ffff: ingress", result)
self.assertIn("tc filter add dev eth0", result)
self.assertIn("ifb0", result)
@@ -403,7 +379,6 @@ class NetworkScenariosTest(unittest.TestCase):
duration=120
)
# Assertions
self.assertIn("eth0", result)
self.assertIn("eth1", result)
self.assertIn("ifb0", result)
@@ -423,7 +398,6 @@ class NetworkScenariosTest(unittest.TestCase):
duration=120
)
# Assertions
self.assertIn("delay 50ms", result)
self.assertIn("loss 0.02", result)
self.assertIn("rate 100mbit", result)
@@ -442,12 +416,9 @@ class NetworkScenariosTest(unittest.TestCase):
@patch('yaml.safe_load')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.create_virtual_interfaces')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.get_ingress_cmd')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_apply_ingress_filter(self, mock_kube_helper, mock_get_cmd, mock_create_virtual, mock_yaml):
def test_apply_ingress_filter(self, mock_get_cmd, mock_create_virtual, mock_yaml):
"""Test applying ingress filters to a node"""
# Setup mocks
mock_cli = Mock()
mock_batch_cli = Mock()
mock_kubecli = Mock()
mock_pod_template = Mock()
mock_job_template = Mock()
mock_job_template.render.return_value = "job_yaml"
@@ -460,75 +431,66 @@ class NetworkScenariosTest(unittest.TestCase):
mock_yaml.return_value = {"metadata": {"name": "test-job"}}
mock_get_cmd.return_value = "tc commands"
mock_kube_helper.create_job.return_value = Mock()
mock_kubecli.create_job.return_value = Mock()
# Test
result = ingress_shaping.apply_ingress_filter(
cfg=mock_cfg,
interface_list=["eth0"],
node="node1",
pod_template=mock_pod_template,
job_template=mock_job_template,
batch_cli=mock_batch_cli,
cli=mock_cli,
kubecli=mock_kubecli,
create_interfaces=True,
param_selector="all",
image="quay.io/krkn-chaos/krkn:tools"
)
# Assertions
mock_create_virtual.assert_called_once()
mock_get_cmd.assert_called_once()
mock_kube_helper.create_job.assert_called_once()
mock_kubecli.create_job.assert_called_once()
self.assertEqual(result, "test-job")
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_create_virtual_interfaces(self, mock_kube_helper):
def test_create_virtual_interfaces(self):
"""Test creating virtual interfaces on a node"""
mock_cli = Mock()
mock_kubecli = Mock()
mock_pod_template = Mock()
mock_pod_template.render.return_value = "pod_yaml"
mock_kube_helper.create_pod.return_value = None
mock_kube_helper.exec_cmd_in_pod.return_value = None
mock_kube_helper.delete_pod.return_value = None
mock_kubecli.create_pod.return_value = None
mock_kubecli.exec_cmd_in_pod.return_value = None
mock_kubecli.delete_pod.return_value = None
# Test
ingress_shaping.create_virtual_interfaces(
cli=mock_cli,
kubecli=mock_kubecli,
interface_list=["eth0", "eth1"],
node="test-node",
pod_template=mock_pod_template,
image="quay.io/krkn-chaos/krkn:tools"
)
# Assertions
mock_kube_helper.create_pod.assert_called_once()
mock_kube_helper.delete_pod.assert_called_once()
mock_kubecli.create_pod.assert_called_once()
mock_kubecli.delete_pod.assert_called_once()
@patch('krkn.scenario_plugins.native.network.ingress_shaping.delete_ifb')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
def test_delete_virtual_interfaces(self, mock_kube_helper, mock_delete_ifb):
def test_delete_virtual_interfaces(self, mock_delete_ifb):
"""Test deleting virtual interfaces from nodes"""
mock_cli = Mock()
mock_kubecli = Mock()
mock_pod_template = Mock()
mock_pod_template.render.return_value = "pod_yaml"
mock_kube_helper.create_pod.return_value = None
mock_kube_helper.delete_pod.return_value = None
mock_kubecli.create_pod.return_value = None
mock_kubecli.delete_pod.return_value = None
# Test
ingress_shaping.delete_virtual_interfaces(
cli=mock_cli,
kubecli=mock_kubecli,
node_list=["node1", "node2"],
pod_template=mock_pod_template,
image="quay.io/krkn-chaos/krkn:tools"
)
# Assertions
self.assertEqual(mock_kube_helper.create_pod.call_count, 2)
self.assertEqual(mock_kubecli.create_pod.call_count, 2)
self.assertEqual(mock_delete_ifb.call_count, 2)
self.assertEqual(mock_kube_helper.delete_pod.call_count, 2)
self.assertEqual(mock_kubecli.delete_pod.call_count, 2)
@patch('krkn.scenario_plugins.native.network.ingress_shaping.Environment')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.FileSystemLoader')
@@ -538,22 +500,18 @@ class NetworkScenariosTest(unittest.TestCase):
@patch('krkn.scenario_plugins.native.network.ingress_shaping.wait_for_job')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.apply_ingress_filter')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.get_node_interfaces')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.KrknKubernetes')
def test_network_chaos_parallel_execution(
self, mock_kube_helper, mock_get_nodes, mock_apply_filter,
self, mock_kubecli_class, mock_get_nodes, mock_apply_filter,
mock_wait_job, mock_delete_virtual, mock_delete_jobs, mock_yaml,
mock_file_loader, mock_env
):
"""Test network chaos with parallel execution"""
# Setup mocks
mock_cli = Mock()
mock_batch_cli = Mock()
mock_kubecli_class.return_value = Mock()
mock_yaml.return_value = {"metadata": {"name": "test-pod"}}
mock_kube_helper.setup_kubernetes.return_value = (mock_cli, mock_batch_cli)
mock_get_nodes.return_value = {"node1": ["eth0"], "node2": ["eth1"]}
mock_apply_filter.side_effect = ["job1", "job2"]
# Test
cfg = ingress_shaping.NetworkScenarioConfig(
label_selector="node-role.kubernetes.io/worker",
instance_count=2,
@@ -565,7 +523,6 @@ class NetworkScenariosTest(unittest.TestCase):
output_id, output_data = ingress_shaping.network_chaos(params=cfg, run_id="test-run")
# Assertions
self.assertEqual(output_id, "success")
self.assertEqual(output_data.filter_direction, "ingress")
self.assertEqual(output_data.execution_type, "parallel")
@@ -582,22 +539,18 @@ class NetworkScenariosTest(unittest.TestCase):
@patch('krkn.scenario_plugins.native.network.ingress_shaping.wait_for_job')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.apply_ingress_filter')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.get_node_interfaces')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.KrknKubernetes')
def test_network_chaos_serial_execution(
self, mock_kube_helper, mock_get_nodes, mock_apply_filter,
self, mock_kubecli_class, mock_get_nodes, mock_apply_filter,
mock_wait_job, mock_delete_virtual, mock_delete_jobs, mock_sleep, mock_yaml,
mock_file_loader, mock_env
):
"""Test network chaos with serial execution"""
# Setup mocks
mock_cli = Mock()
mock_batch_cli = Mock()
mock_kubecli_class.return_value = Mock()
mock_yaml.return_value = {"metadata": {"name": "test-pod"}}
mock_kube_helper.setup_kubernetes.return_value = (mock_cli, mock_batch_cli)
mock_get_nodes.return_value = {"node1": ["eth0"]}
mock_apply_filter.return_value = "job1"
# Test
cfg = ingress_shaping.NetworkScenarioConfig(
label_selector="node-role.kubernetes.io/worker",
instance_count=1,
@@ -609,12 +562,9 @@ class NetworkScenariosTest(unittest.TestCase):
output_id, output_data = ingress_shaping.network_chaos(params=cfg, run_id="test-run")
# Assertions
self.assertEqual(output_id, "success")
self.assertEqual(output_data.execution_type, "serial")
# Should be called once per parameter per node
self.assertEqual(mock_apply_filter.call_count, 2)
# Should wait for jobs twice (once per parameter)
self.assertEqual(mock_wait_job.call_count, 2)
@patch('krkn.scenario_plugins.native.network.ingress_shaping.Environment')
@@ -623,20 +573,16 @@ class NetworkScenariosTest(unittest.TestCase):
@patch('krkn.scenario_plugins.native.network.ingress_shaping.delete_jobs')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.delete_virtual_interfaces')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.get_node_interfaces')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.KrknKubernetes')
def test_network_chaos_invalid_execution_type(
self, mock_kube_helper, mock_get_nodes, mock_delete_virtual, mock_delete_jobs, mock_yaml,
self, mock_kubecli_class, mock_get_nodes, mock_delete_virtual, mock_delete_jobs, mock_yaml,
mock_file_loader, mock_env
):
"""Test network chaos with invalid execution type"""
# Setup mocks
mock_cli = Mock()
mock_batch_cli = Mock()
mock_kubecli_class.return_value = Mock()
mock_yaml.return_value = {"metadata": {"name": "test-pod"}}
mock_kube_helper.setup_kubernetes.return_value = (mock_cli, mock_batch_cli)
mock_get_nodes.return_value = {"node1": ["eth0"]}
# Test
cfg = ingress_shaping.NetworkScenarioConfig(
label_selector="node-role.kubernetes.io/worker",
instance_count=1,
@@ -647,7 +593,6 @@ class NetworkScenariosTest(unittest.TestCase):
output_id, output_data = ingress_shaping.network_chaos(params=cfg, run_id="test-run")
# Assertions
self.assertEqual(output_id, "error")
self.assertIn("Invalid execution type", output_data.error)
@@ -657,20 +602,16 @@ class NetworkScenariosTest(unittest.TestCase):
@patch('krkn.scenario_plugins.native.network.ingress_shaping.delete_jobs')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.delete_virtual_interfaces')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.get_node_interfaces')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.KrknKubernetes')
def test_network_chaos_get_nodes_error(
self, mock_kube_helper, mock_get_nodes, mock_delete_virtual, mock_delete_jobs, mock_yaml,
self, mock_kubecli_class, mock_get_nodes, mock_delete_virtual, mock_delete_jobs, mock_yaml,
mock_file_loader, mock_env
):
"""Test network chaos when getting nodes fails"""
# Setup mocks
mock_cli = Mock()
mock_batch_cli = Mock()
mock_kubecli_class.return_value = Mock()
mock_yaml.return_value = {"metadata": {"name": "test-pod"}}
mock_kube_helper.setup_kubernetes.return_value = (mock_cli, mock_batch_cli)
mock_get_nodes.side_effect = Exception("Failed to get nodes")
# Test
cfg = ingress_shaping.NetworkScenarioConfig(
label_selector="node-role.kubernetes.io/worker",
instance_count=1,
@@ -679,7 +620,6 @@ class NetworkScenariosTest(unittest.TestCase):
output_id, output_data = ingress_shaping.network_chaos(params=cfg, run_id="test-run")
# Assertions
self.assertEqual(output_id, "error")
self.assertIn("Failed to get nodes", output_data.error)
@@ -690,22 +630,18 @@ class NetworkScenariosTest(unittest.TestCase):
@patch('krkn.scenario_plugins.native.network.ingress_shaping.delete_virtual_interfaces')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.apply_ingress_filter')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.get_node_interfaces')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper')
@patch('krkn.scenario_plugins.native.network.ingress_shaping.KrknKubernetes')
def test_network_chaos_apply_filter_error(
self, mock_kube_helper, mock_get_nodes, mock_apply_filter,
self, mock_kubecli_class, mock_get_nodes, mock_apply_filter,
mock_delete_virtual, mock_delete_jobs, mock_yaml,
mock_file_loader, mock_env
):
"""Test network chaos when applying filter fails"""
# Setup mocks
mock_cli = Mock()
mock_batch_cli = Mock()
mock_kubecli_class.return_value = Mock()
mock_yaml.return_value = {"metadata": {"name": "test-pod"}}
mock_kube_helper.setup_kubernetes.return_value = (mock_cli, mock_batch_cli)
mock_get_nodes.return_value = {"node1": ["eth0"]}
mock_apply_filter.side_effect = Exception("Failed to apply filter")
# Test
cfg = ingress_shaping.NetworkScenarioConfig(
label_selector="node-role.kubernetes.io/worker",
instance_count=1,
@@ -715,10 +651,8 @@ class NetworkScenariosTest(unittest.TestCase):
output_id, output_data = ingress_shaping.network_chaos(params=cfg, run_id="test-run")
# Assertions
self.assertEqual(output_id, "error")
self.assertIn("Failed to apply filter", output_data.error)
# Cleanup should still be called
mock_delete_virtual.assert_called_once()

View File

@@ -1,13 +1,31 @@
#!/usr/bin/env python3
"""
Test suite for pod network outage plugin
This test suite covers the get_test_pods function in pod_network_outage_plugin
using mocks to avoid needing actual Kubernetes infrastructure.
Test Coverage:
- get_test_pods with exclude_label filters
- get_test_pods with pod_name taking precedence over label filters
- get_test_pods with both pod_name and exclude_label
IMPORTANT: These tests use mocking and do NOT require any Kubernetes cluster.
All Kubernetes API calls are mocked via unittest.mock.
Usage:
python -m unittest tests/test_pod_network_outage.py -v
# Run with coverage
python -m coverage run -a -m unittest tests/test_pod_network_outage.py -v
Assisted By: Claude Code
"""
import unittest
from unittest.mock import MagicMock, patch
import sys
import os
from unittest.mock import MagicMock
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from krkn.scenario_plugins.native.pod_network_outage.kubernetes_functions import (
list_pods,
)
from krkn.scenario_plugins.native.pod_network_outage.pod_network_outage_plugin import (
get_test_pods,
)
@@ -15,36 +33,33 @@ from krkn.scenario_plugins.native.pod_network_outage.pod_network_outage_plugin i
class TestPodNetworkOutage(unittest.TestCase):
def test_list_pods_with_exclude_label(self):
"""Test that list_pods correctly excludes pods with matching exclude_label"""
# Create mock pod items
pod1 = MagicMock()
pod1.metadata.name = "pod1"
pod1.metadata.labels = {"app": "test", "skip": "true"}
"""Test that get_test_pods passes exclude_label to kubecli.list_pods"""
mock_kubecli = MagicMock()
mock_kubecli.list_pods.return_value = ["pod2", "pod3"]
pod2 = MagicMock()
pod2.metadata.name = "pod2"
pod2.metadata.labels = {"app": "test"}
result = get_test_pods(None, "app=test", "test-namespace", mock_kubecli, "skip=true")
pod3 = MagicMock()
pod3.metadata.name = "pod3"
pod3.metadata.labels = {"app": "test", "skip": "false"}
# Create mock API response
mock_response = MagicMock()
mock_response.items = [pod1, pod2, pod3]
# Create mock client
mock_cli = MagicMock()
mock_cli.list_namespaced_pod.return_value = mock_response
# Test without exclude_label
result = list_pods(mock_cli, "test-namespace", "app=test")
self.assertEqual(result, ["pod1", "pod2", "pod3"])
# Test with exclude_label
result = list_pods(mock_cli, "test-namespace", "app=test", "skip=true")
mock_kubecli.list_pods.assert_called_once_with(
label_selector="app=test",
namespace="test-namespace",
exclude_label="skip=true",
)
self.assertEqual(result, ["pod2", "pod3"])
def test_list_pods_without_exclude_label(self):
"""Test that get_test_pods works without exclude_label"""
mock_kubecli = MagicMock()
mock_kubecli.list_pods.return_value = ["pod1", "pod2", "pod3"]
result = get_test_pods(None, "app=test", "test-namespace", mock_kubecli)
mock_kubecli.list_pods.assert_called_once_with(
label_selector="app=test",
namespace="test-namespace",
exclude_label=None,
)
self.assertEqual(result, ["pod1", "pod2", "pod3"])
def test_get_test_pods_with_exclude_label(self):
"""Test that get_test_pods passes exclude_label to list_pods correctly"""
# Create mock kubecli