From 626e203d337e919c7b32c4fa36a5fc8ba56be6ff Mon Sep 17 00:00:00 2001 From: Paige Patton <64206430+paigerube14@users.noreply.github.com> Date: Tue, 31 Mar 2026 07:46:40 -0500 Subject: [PATCH] removing kubernetes functions (#1205) Signed-off-by: Paige Patton --- .../native/network/ingress_shaping.py | 182 ++++----- .../native/network/kubernetes_functions.py | 284 -------------- .../kubernetes_functions.py | 274 ------------- tests/test_ingress_network_plugin.py | 364 +++++++----------- tests/test_pod_network_outage.py | 83 ++-- 5 files changed, 278 insertions(+), 909 deletions(-) delete mode 100644 krkn/scenario_plugins/native/network/kubernetes_functions.py delete mode 100644 krkn/scenario_plugins/native/pod_network_outage/kubernetes_functions.py diff --git a/krkn/scenario_plugins/native/network/ingress_shaping.py b/krkn/scenario_plugins/native/network/ingress_shaping.py index afc71b5e..9f61e2da 100644 --- a/krkn/scenario_plugins/native/network/ingress_shaping.py +++ b/krkn/scenario_plugins/native/network/ingress_shaping.py @@ -8,11 +8,9 @@ import re import random from traceback import format_exc from jinja2 import Environment, FileSystemLoader -from . import kubernetes_functions as kube_helper +from krkn_lib.k8s import KrknKubernetes import typing from arcaflow_plugin_sdk import validation, plugin -from kubernetes.client.api.core_v1_api import CoreV1Api as CoreV1Api -from kubernetes.client.api.batch_v1_api import BatchV1Api as BatchV1Api @dataclass @@ -150,7 +148,7 @@ class NetworkScenarioErrorOutput: ) -def get_default_interface(node: str, pod_template, cli: CoreV1Api, image: str) -> str: +def get_default_interface(node: str, pod_template, kubecli: KrknKubernetes, image: str) -> str: """ Function that returns a random interface from a node @@ -162,20 +160,20 @@ def get_default_interface(node: str, pod_template, cli: CoreV1Api, image: str) - - The YAML template used to instantiate a pod to query the node's interface - cli (CoreV1Api) - - Object to interact with Kubernetes Python client's CoreV1 API + kubecli (KrknKubernetes) + - Object to interact with Kubernetes Python client Returns: Default interface (string) belonging to the node """ pod_name_regex = str(random.randint(0, 10000)) - pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex,nodename=node, image=image)) + pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex, nodename=node, image=image)) logging.info("Creating pod to query interface on node %s" % node) - kube_helper.create_pod(cli, pod_body, "default", 300) + kubecli.create_pod(pod_body, "default", 300) pod_name = f"fedtools-{pod_name_regex}" try: cmd = ["ip", "r"] - output = kube_helper.exec_cmd_in_pod(cli, cmd, pod_name, "default") + output = kubecli.exec_cmd_in_pod(cmd, pod_name, "default") if not output: logging.error("Exception occurred while executing command in pod") @@ -191,13 +189,13 @@ def get_default_interface(node: str, pod_template, cli: CoreV1Api, image: str) - finally: logging.info("Deleting pod to query interface on node") - kube_helper.delete_pod(cli, pod_name, "default") + kubecli.delete_pod(pod_name, "default") return interfaces def verify_interface( - input_interface_list: typing.List[str], node: str, pod_template, cli: CoreV1Api, image: str + input_interface_list: typing.List[str], node: str, pod_template, kubecli: KrknKubernetes, image: str ) -> typing.List[str]: """ Function that verifies whether a list of interfaces is present in the node. @@ -214,21 +212,21 @@ def verify_interface( - The YAML template used to instantiate a pod to query the node's interfaces - cli (CoreV1Api) - - Object to interact with Kubernetes Python client's CoreV1 API + kubecli (KrknKubernetes) + - Object to interact with Kubernetes Python client Returns: The interface list for the node """ pod_name_regex = str(random.randint(0, 10000)) - pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex,nodename=node, image=image)) + pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex, nodename=node, image=image)) logging.info("Creating pod to query interface on node %s" % node) - kube_helper.create_pod(cli, pod_body, "default", 300) + kubecli.create_pod(pod_body, "default", 300) pod_name = f"fedtools-{pod_name_regex}" try: if input_interface_list == []: cmd = ["ip", "r"] - output = kube_helper.exec_cmd_in_pod(cli, cmd, pod_name, "default") + output = kubecli.exec_cmd_in_pod(cmd, pod_name, "default") if not output: logging.error("Exception occurred while executing command in pod") @@ -244,7 +242,7 @@ def verify_interface( else: cmd = ["ip", "-br", "addr", "show"] - output = kube_helper.exec_cmd_in_pod(cli, cmd, pod_name, "default") + output = kubecli.exec_cmd_in_pod(cmd, pod_name, "default") if not output: logging.error("Exception occurred while executing command in pod") @@ -267,7 +265,7 @@ def verify_interface( ) finally: logging.info("Deleting pod to query interface on node") - kube_helper.delete_pod(cli, pod_name, "default") + kubecli.delete_pod(pod_name, "default") return input_interface_list @@ -277,7 +275,7 @@ def get_node_interfaces( label_selector: str, instance_count: int, pod_template, - cli: CoreV1Api, + kubecli: KrknKubernetes, image: str ) -> typing.Dict[str, typing.List[str]]: """ @@ -305,8 +303,8 @@ def get_node_interfaces( - The YAML template used to instantiate a pod to query the node's interfaces - cli (CoreV1Api) - - Object to interact with Kubernetes Python client's CoreV1 API + kubecli (KrknKubernetes) + - Object to interact with Kubernetes Python client Returns: Filtered dictionary containing the test nodes and their test interfaces @@ -317,22 +315,22 @@ def get_node_interfaces( "If node names and interfaces aren't provided, " "then the label selector must be provided" ) - nodes = kube_helper.get_node(None, label_selector, instance_count, cli) + nodes = kubecli.get_node(None, label_selector, instance_count) node_interface_dict = {} for node in nodes: - node_interface_dict[node] = get_default_interface(node, pod_template, cli, image) + node_interface_dict[node] = get_default_interface(node, pod_template, kubecli, image) else: node_name_list = node_interface_dict.keys() filtered_node_list = [] for node in node_name_list: filtered_node_list.extend( - kube_helper.get_node(node, label_selector, instance_count, cli) + kubecli.get_node(node, label_selector, instance_count) ) for node in filtered_node_list: node_interface_dict[node] = verify_interface( - node_interface_dict[node], node, pod_template, cli, image + node_interface_dict[node], node, pod_template, kubecli, image ) return node_interface_dict @@ -344,11 +342,10 @@ def apply_ingress_filter( node: str, pod_template, job_template, - batch_cli: BatchV1Api, - cli: CoreV1Api, + kubecli: KrknKubernetes, create_interfaces: bool = True, param_selector: str = "all", - image:str = "quay.io/krkn-chaos/krkn:tools", + image: str = "quay.io/krkn-chaos/krkn:tools", ) -> str: """ Function that applies the filters to shape incoming traffic to @@ -374,11 +371,8 @@ def apply_ingress_filter( - The YAML template used to instantiate a job to apply and remove the filters on the interfaces - batch_cli - - Object to interact with Kubernetes Python client's BatchV1 API - - cli (CoreV1Api) - - Object to interact with Kubernetes Python client's CoreV1 API + kubecli (KrknKubernetes) + - Object to interact with Kubernetes Python client param_selector (string) - Used to specify what kind of filter to apply. Useful during @@ -394,7 +388,7 @@ def apply_ingress_filter( network_params = {param_selector: cfg.network_params[param_selector]} if create_interfaces: - create_virtual_interfaces(cli, interface_list, node, pod_template, image) + create_virtual_interfaces(kubecli, interface_list, node, pod_template, image) exec_cmd = get_ingress_cmd( interface_list, network_params, duration=cfg.test_duration @@ -403,7 +397,7 @@ def apply_ingress_filter( job_body = yaml.safe_load( job_template.render(jobname=str(hash(node))[:5], nodename=node, image=image, cmd=exec_cmd) ) - api_response = kube_helper.create_job(batch_cli, job_body) + api_response = kubecli.create_job(job_body) if api_response is None: raise Exception("Error creating job") @@ -412,15 +406,15 @@ def apply_ingress_filter( def create_virtual_interfaces( - cli: CoreV1Api, interface_list: typing.List[str], node: str, pod_template, image: str + kubecli: KrknKubernetes, interface_list: typing.List[str], node: str, pod_template, image: str ) -> None: """ Function that creates a privileged pod and uses it to create virtual interfaces on the node Args: - cli (CoreV1Api) - - Object to interact with Kubernetes Python client's CoreV1 API + kubecli (KrknKubernetes) + - Object to interact with Kubernetes Python client interface_list (List of strings) - The list of interfaces on the node for which virtual interfaces @@ -434,37 +428,34 @@ def create_virtual_interfaces( virtual interfaces on the node """ pod_name_regex = str(random.randint(0, 10000)) - pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex,nodename=node, image=image)) - kube_helper.create_pod(cli, pod_body, "default", 300) + pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex, nodename=node, image=image)) + kubecli.create_pod(pod_body, "default", 300) logging.info( "Creating {0} virtual interfaces on node {1} using a pod".format( len(interface_list), node ) ) pod_name = f"modtools-{pod_name_regex}" - create_ifb(cli, len(interface_list), pod_name) + create_ifb(kubecli, len(interface_list), pod_name) logging.info("Deleting pod used to create virtual interfaces") - kube_helper.delete_pod(cli, pod_name, "default") + kubecli.delete_pod(pod_name, "default") def delete_virtual_interfaces( - cli: CoreV1Api, node_list: typing.List[str], pod_template, image: str + kubecli: KrknKubernetes, node_list: typing.List[str], pod_template, image: str ): """ Function that creates a privileged pod and uses it to delete all virtual interfaces on the specified nodes Args: - cli (CoreV1Api) - - Object to interact with Kubernetes Python client's CoreV1 API + kubecli (KrknKubernetes) + - Object to interact with Kubernetes Python client node_list (List of strings) - The list of nodes on which the list of virtual interfaces are to be deleted - node (string) - - The node on which the virtual interfaces are created - pod_template (jinja2.environment.Template)) - The YAML template used to instantiate a pod to delete virtual interfaces on the node @@ -472,46 +463,45 @@ def delete_virtual_interfaces( for node in node_list: pod_name_regex = str(random.randint(0, 10000)) - pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex,nodename=node, image=image)) - kube_helper.create_pod(cli, pod_body, "default", 300) + pod_body = yaml.safe_load(pod_template.render(regex_name=pod_name_regex, nodename=node, image=image)) + kubecli.create_pod(pod_body, "default", 300) logging.info("Deleting all virtual interfaces on node {0}".format(node)) pod_name = f"modtools-{pod_name_regex}" - delete_ifb(cli, pod_name) - kube_helper.delete_pod(cli, pod_name, "default") + delete_ifb(kubecli, pod_name) + kubecli.delete_pod(pod_name, "default") -def create_ifb(cli: CoreV1Api, number: int, pod_name: str): +def create_ifb(kubecli: KrknKubernetes, number: int, pod_name: str): """ Function that creates virtual interfaces in a pod. Makes use of modprobe commands """ - exec_command = ["chroot", "/host", "modprobe", "ifb", "numifbs=" + str(number)] - kube_helper.exec_cmd_in_pod(cli, exec_command, pod_name, "default") + exec_command = ["/host", "modprobe", "ifb", "numifbs=" + str(number)] + kubecli.exec_cmd_in_pod(exec_command, pod_name, "default", base_command="chroot") for i in range(0, number): - exec_command = ["chroot", "/host", "ip", "link", "set", "dev"] - exec_command += ["ifb" + str(i), "up"] - kube_helper.exec_cmd_in_pod(cli, exec_command, pod_name, "default") + exec_command = ["/host", "ip", "link", "set", "dev", "ifb" + str(i), "up"] + kubecli.exec_cmd_in_pod(exec_command, pod_name, "default", base_command="chroot") -def delete_ifb(cli: CoreV1Api, pod_name: str): +def delete_ifb(kubecli: KrknKubernetes, pod_name: str): """ Function that deletes all virtual interfaces in a pod. Makes use of modprobe command """ - exec_command = ["chroot", "/host", "modprobe", "-r", "ifb"] - kube_helper.exec_cmd_in_pod(cli, exec_command, pod_name, "default") + exec_command = ["/host", "modprobe", "-r", "ifb"] + kubecli.exec_cmd_in_pod(exec_command, pod_name, "default", base_command="chroot") -def get_job_pods(cli: CoreV1Api, api_response): +def get_job_pods(kubecli: KrknKubernetes, api_response): """ Function that gets the pod corresponding to the job Args: - cli (CoreV1Api) - - Object to interact with Kubernetes Python client's CoreV1 API + kubecli (KrknKubernetes) + - Object to interact with Kubernetes Python client api_response - The API response for the job status @@ -522,22 +512,22 @@ def get_job_pods(cli: CoreV1Api, api_response): controllerUid = api_response.metadata.labels["controller-uid"] pod_label_selector = "controller-uid=" + controllerUid - pods_list = kube_helper.list_pods( - cli, label_selector=pod_label_selector, namespace="default" + pods_list = kubecli.list_pods( + label_selector=pod_label_selector, namespace="default" ) return pods_list[0] def wait_for_job( - batch_cli: BatchV1Api, job_list: typing.List[str], timeout: int = 300 + kubecli: KrknKubernetes, job_list: typing.List[str], timeout: int = 300 ) -> None: """ Function that waits for a list of jobs to finish within a time period Args: - batch_cli (BatchV1Api) - - Object to interact with Kubernetes Python client's BatchV1 API + kubecli (KrknKubernetes) + - Object to interact with Kubernetes Python client job_list (List of strings) - The list of jobs to check for completion @@ -552,9 +542,7 @@ def wait_for_job( while count != job_len: for job_name in job_list: try: - api_response = kube_helper.get_job_status( - batch_cli, job_name, namespace="default" - ) + api_response = kubecli.get_job_status(job_name, namespace="default") if ( api_response.status.succeeded is not None or api_response.status.failed is not None @@ -571,16 +559,13 @@ def wait_for_job( time.sleep(5) -def delete_jobs(cli: CoreV1Api, batch_cli: BatchV1Api, job_list: typing.List[str]): +def delete_jobs(kubecli: KrknKubernetes, job_list: typing.List[str]): """ Function that deletes jobs Args: - cli (CoreV1Api) - - Object to interact with Kubernetes Python client's CoreV1 API - - batch_cli (BatchV1Api) - - Object to interact with Kubernetes Python client's BatchV1 API + kubecli (KrknKubernetes) + - Object to interact with Kubernetes Python client job_list (List of strings) - The list of jobs to delete @@ -588,23 +573,19 @@ def delete_jobs(cli: CoreV1Api, batch_cli: BatchV1Api, job_list: typing.List[str for job_name in job_list: try: - api_response = kube_helper.get_job_status( - batch_cli, job_name, namespace="default" - ) + api_response = kubecli.get_job_status(job_name, namespace="default") if api_response.status.failed is not None: - pod_name = get_job_pods(cli, api_response) - pod_stat = kube_helper.read_pod(cli, name=pod_name, namespace="default") + pod_name = get_job_pods(kubecli, api_response) + pod_stat = kubecli.read_pod(name=pod_name, namespace="default") logging.error(pod_stat.status.container_statuses) - pod_log_response = kube_helper.get_pod_log( - cli, name=pod_name, namespace="default" + pod_log_response = kubecli.get_pod_log( + name=pod_name, namespace="default" ) pod_log = pod_log_response.data.decode("utf-8") logging.error(pod_log) except Exception as e: logging.warning("Exception in getting job status: %s" % str(e)) - api_response = kube_helper.delete_job( - batch_cli, name=job_name, namespace="default" - ) + kubecli.delete_job(name=job_name, namespace="default") def get_ingress_cmd( @@ -715,7 +696,7 @@ def network_chaos( job_template = env.get_template("job.j2") pod_interface_template = env.get_template("pod_interface.j2") pod_module_template = env.get_template("pod_module.j2") - cli, batch_cli = kube_helper.setup_kubernetes(cfg.kubeconfig_path) + kubecli = KrknKubernetes(kubeconfig_path=cfg.kubeconfig_path) test_image = cfg.image logging.info("Starting Ingress Network Chaos") try: @@ -724,7 +705,7 @@ def network_chaos( cfg.label_selector, cfg.instance_count, pod_interface_template, - cli, + kubecli, test_image ) except Exception: @@ -741,13 +722,12 @@ def network_chaos( node, pod_module_template, job_template, - batch_cli, - cli, - test_image + kubecli, + image=test_image ) ) logging.info("Waiting for parallel job to finish") - wait_for_job(batch_cli, job_list[:], cfg.test_duration + 100) + wait_for_job(kubecli, job_list[:], cfg.test_duration + 100) elif cfg.execution_type == "serial": create_interfaces = True @@ -760,22 +740,20 @@ def network_chaos( node, pod_module_template, job_template, - batch_cli, - cli, + kubecli, create_interfaces=create_interfaces, param_selector=param, image=test_image ) ) logging.info("Waiting for serial job to finish") - wait_for_job(batch_cli, job_list[:], cfg.test_duration + 100) + wait_for_job(kubecli, job_list[:], cfg.test_duration + 100) logging.info("Deleting jobs") - delete_jobs(cli, batch_cli, job_list[:]) + delete_jobs(kubecli, job_list[:]) job_list = [] - + create_interfaces = False else: - return "error", NetworkScenarioErrorOutput( "Invalid execution type - serial and parallel are " "the only accepted types" @@ -790,6 +768,6 @@ def network_chaos( logging.error("Ingress Network Chaos exiting due to Exception - %s" % e) return "error", NetworkScenarioErrorOutput(format_exc()) finally: - delete_virtual_interfaces(cli, node_interface_dict.keys(), pod_module_template, test_image) + delete_virtual_interfaces(kubecli, node_interface_dict.keys(), pod_module_template, test_image) logging.info("Deleting jobs(if any)") - delete_jobs(cli, batch_cli, job_list[:]) + delete_jobs(kubecli, job_list[:]) diff --git a/krkn/scenario_plugins/native/network/kubernetes_functions.py b/krkn/scenario_plugins/native/network/kubernetes_functions.py deleted file mode 100644 index ff91d529..00000000 --- a/krkn/scenario_plugins/native/network/kubernetes_functions.py +++ /dev/null @@ -1,284 +0,0 @@ -from kubernetes import config, client -from kubernetes.client.rest import ApiException -from kubernetes.stream import stream -import sys -import time -import logging -import random - -def setup_kubernetes(kubeconfig_path): - """ - Sets up the Kubernetes client - """ - - if kubeconfig_path is None: - kubeconfig_path = config.KUBE_CONFIG_DEFAULT_LOCATION - config.load_kube_config(kubeconfig_path) - cli = client.CoreV1Api() - batch_cli = client.BatchV1Api() - - return cli, batch_cli - - -def create_job(batch_cli, body, namespace="default"): - """ - Function used to create a job from a YAML config - """ - - try: - api_response = batch_cli.create_namespaced_job(body=body, namespace=namespace) - return api_response - except ApiException as api: - logging.warning( - "Exception when calling \ - BatchV1Api->create_job: %s" - % api - ) - if api.status == 409: - logging.warning("Job already present") - except Exception as e: - logging.error( - "Exception when calling \ - BatchV1Api->create_namespaced_job: %s" - % e - ) - raise - - -def delete_pod(cli, name, namespace): - """ - Function that deletes a pod and waits until deletion is complete - """ - - try: - cli.delete_namespaced_pod(name=name, namespace=namespace) - while cli.read_namespaced_pod(name=name, namespace=namespace): - time.sleep(1) - except ApiException as e: - if e.status == 404: - logging.info("Pod deleted") - else: - logging.error("Failed to delete pod %s" % e) - raise e - - -def create_pod(cli, body, namespace, timeout=120): - """ - Function used to create a pod from a YAML config - """ - - try: - pod_stat = None - pod_stat = cli.create_namespaced_pod(body=body, namespace=namespace) - end_time = time.time() + timeout - while True: - pod_stat = cli.read_namespaced_pod(name=body["metadata"]["name"], namespace=namespace) - if pod_stat.status.phase == "Running": - break - if time.time() > end_time: - raise Exception("Starting pod failed") - time.sleep(1) - except Exception as e: - logging.error("Pod creation failed %s" % e) - if pod_stat: - logging.error(pod_stat.status.container_statuses) - delete_pod(cli, body["metadata"]["name"], namespace) - sys.exit(1) - - -def exec_cmd_in_pod(cli, command, pod_name, namespace, container=None): - """ - Function used to execute a command in a running pod - """ - - exec_command = command - try: - if container: - ret = stream( - cli.connect_get_namespaced_pod_exec, - pod_name, - namespace, - container=container, - command=exec_command, - stderr=True, - stdin=False, - stdout=True, - tty=False, - ) - else: - ret = stream( - cli.connect_get_namespaced_pod_exec, - pod_name, - namespace, - command=exec_command, - stderr=True, - stdin=False, - stdout=True, - tty=False, - ) - except Exception as e: - return False - - return ret - - -def create_ifb(cli, number, pod_name): - """ - Function that creates virtual interfaces in a pod. Makes use of modprobe commands - """ - - exec_command = ['chroot', '/host', 'modprobe', 'ifb','numifbs=' + str(number)] - resp = exec_cmd_in_pod(cli, exec_command, pod_name, 'default') - - for i in range(0, number): - exec_command = ['chroot', '/host','ip','link','set','dev'] - exec_command+= ['ifb' + str(i), 'up'] - resp = exec_cmd_in_pod(cli, exec_command, pod_name, 'default') - - -def delete_ifb(cli, pod_name): - """ - Function that deletes all virtual interfaces in a pod. Makes use of modprobe command - """ - - exec_command = ['chroot', '/host', 'modprobe', '-r', 'ifb'] - resp = exec_cmd_in_pod(cli, exec_command, pod_name, 'default') - - -def list_pods(cli, namespace, label_selector=None): - """ - Function used to list pods in a given namespace and having a certain label - """ - - pods = [] - try: - if label_selector: - ret = cli.list_namespaced_pod(namespace, pretty=True, label_selector=label_selector) - else: - ret = cli.list_namespaced_pod(namespace, pretty=True) - except ApiException as e: - logging.error( - "Exception when calling \ - CoreV1Api->list_namespaced_pod: %s\n" - % e - ) - raise e - for pod in ret.items: - pods.append(pod.metadata.name) - - return pods - - -def get_job_status(batch_cli, name, namespace="default"): - """ - Function that retrieves the status of a running job in a given namespace - """ - - try: - return batch_cli.read_namespaced_job_status(name=name, namespace=namespace) - except Exception as e: - logging.error( - "Exception when calling \ - BatchV1Api->read_namespaced_job_status: %s" - % e - ) - raise - - -def get_pod_log(cli, name, namespace="default"): - """ - Function that retrieves the logs of a running pod in a given namespace - """ - - return cli.read_namespaced_pod_log( - name=name, namespace=namespace, _return_http_data_only=True, _preload_content=False - ) - - -def read_pod(cli, name, namespace="default"): - """ - Function that retrieves the info of a running pod in a given namespace - """ - - return cli.read_namespaced_pod(name=name, namespace=namespace) - - - -def delete_job(batch_cli, name, namespace="default"): - """ - Deletes a job with the input name and namespace - """ - - try: - api_response = batch_cli.delete_namespaced_job( - name=name, - namespace=namespace, - body=client.V1DeleteOptions(propagation_policy="Foreground", grace_period_seconds=0), - ) - logging.debug("Job deleted. status='%s'" % str(api_response.status)) - return api_response - except ApiException as api: - logging.warning( - "Exception when calling \ - BatchV1Api->create_namespaced_job: %s" - % api - ) - logging.warning("Job already deleted\n") - except Exception as e: - logging.error( - "Exception when calling \ - BatchV1Api->delete_namespaced_job: %s\n" - % e - ) - sys.exit(1) - - -def list_ready_nodes(cli, label_selector=None): - """ - Returns a list of ready nodes - """ - - nodes = [] - try: - if label_selector: - ret = cli.list_node(pretty=True, label_selector=label_selector) - else: - ret = cli.list_node(pretty=True) - except ApiException as e: - logging.error("Exception when calling CoreV1Api->list_node: %s\n" % e) - raise e - for node in ret.items: - for cond in node.status.conditions: - if str(cond.type) == "Ready" and str(cond.status) == "True": - nodes.append(node.metadata.name) - - return nodes - - -def get_node(node_name, label_selector, instance_kill_count, cli): - """ - Returns active node(s) on which the scenario can be performed - """ - - if node_name in list_ready_nodes(cli): - return [node_name] - elif node_name: - logging.info( - "Node with provided node_name does not exist or the node might " - "be in NotReady state." - ) - nodes = list_ready_nodes(cli, label_selector) - if not nodes: - raise Exception("Ready nodes with the provided label selector do not exist") - logging.info( - "Ready nodes with the label selector %s: %s" % (label_selector, nodes) - ) - number_of_nodes = len(nodes) - if instance_kill_count == number_of_nodes: - return nodes - nodes_to_return = [] - for i in range(instance_kill_count): - node_to_add = nodes[random.randint(0, len(nodes) - 1)] - nodes_to_return.append(node_to_add) - nodes.remove(node_to_add) - return nodes_to_return diff --git a/krkn/scenario_plugins/native/pod_network_outage/kubernetes_functions.py b/krkn/scenario_plugins/native/pod_network_outage/kubernetes_functions.py deleted file mode 100644 index aaa4b4e7..00000000 --- a/krkn/scenario_plugins/native/pod_network_outage/kubernetes_functions.py +++ /dev/null @@ -1,274 +0,0 @@ -from kubernetes import config, client -from kubernetes.client.rest import ApiException -from kubernetes.stream import stream -import sys -import time -import logging -import random - - -def setup_kubernetes(kubeconfig_path) -> client.ApiClient: - """ - Sets up the Kubernetes client - """ - if kubeconfig_path is None: - kubeconfig_path = config.KUBE_CONFIG_DEFAULT_LOCATION - client_config = config.load_kube_config(kubeconfig_path) - return client.ApiClient(client_config) - - -def create_job(batch_cli, body, namespace="default"): - """ - Function used to create a job from a YAML config - """ - - try: - api_response = batch_cli.create_namespaced_job(body=body, namespace=namespace) - return api_response - except ApiException as api: - logging.warning( - "Exception when calling \ - BatchV1Api->create_job: %s" - % api - ) - if api.status == 409: - logging.warning("Job already present") - except Exception as e: - logging.error( - "Exception when calling \ - BatchV1Api->create_namespaced_job: %s" - % e - ) - raise - - -def delete_pod(cli, name, namespace): - """ - Function that deletes a pod and waits until deletion is complete - """ - - try: - cli.delete_namespaced_pod(name=name, namespace=namespace) - while cli.read_namespaced_pod(name=name, namespace=namespace): - time.sleep(1) - except ApiException as e: - if e.status == 404: - logging.info("Pod deleted") - else: - logging.error("Failed to delete pod %s" % e) - raise e - - -def create_pod(cli, body, namespace, timeout=120): - """ - Function used to create a pod from a YAML config - """ - - try: - pod_stat = None - pod_stat = cli.create_namespaced_pod(body=body, namespace=namespace) - end_time = time.time() + timeout - while True: - pod_stat = cli.read_namespaced_pod( - name=body["metadata"]["name"], namespace=namespace - ) - if pod_stat.status.phase == "Running": - break - if time.time() > end_time: - raise Exception("Starting pod failed") - time.sleep(1) - except Exception as e: - logging.error("Pod creation failed %s" % e) - if pod_stat: - logging.error(pod_stat.status.container_statuses) - delete_pod(cli, body["metadata"]["name"], namespace) - sys.exit(1) - - -def exec_cmd_in_pod(cli, command, pod_name, namespace, container=None): - """ - Function used to execute a command in a running pod - """ - - exec_command = command - try: - if container: - ret = stream( - cli.connect_get_namespaced_pod_exec, - pod_name, - namespace, - container=container, - command=exec_command, - stderr=True, - stdin=False, - stdout=True, - tty=False, - ) - else: - ret = stream( - cli.connect_get_namespaced_pod_exec, - pod_name, - namespace, - command=exec_command, - stderr=True, - stdin=False, - stdout=True, - tty=False, - ) - except BaseException: - return False - - return ret - - -def list_pods(cli, namespace, label_selector=None, exclude_label=None): - """ - Function used to list pods in a given namespace and having a certain label and excluding pods with exclude_label - and excluding pods with exclude_label - """ - - pods = [] - try: - if label_selector: - ret = cli.list_namespaced_pod( - namespace, pretty=True, label_selector=label_selector - ) - else: - ret = cli.list_namespaced_pod(namespace, pretty=True) - except ApiException as e: - logging.error( - "Exception when calling \ - CoreV1Api->list_namespaced_pod: %s\n" - % e - ) - raise e - - for pod in ret.items: - # Skip pods with the exclude label if specified - if exclude_label and pod.metadata.labels: - exclude_key, exclude_value = exclude_label.split("=", 1) - if ( - exclude_key in pod.metadata.labels - and pod.metadata.labels[exclude_key] == exclude_value - ): - continue - pods.append(pod.metadata.name) - - return pods - - -def get_job_status(batch_cli, name, namespace="default"): - """ - Function that retrieves the status of a running job in a given namespace - """ - - try: - return batch_cli.read_namespaced_job_status(name=name, namespace=namespace) - except Exception as e: - logging.error( - "Exception when calling \ - BatchV1Api->read_namespaced_job_status: %s" - % e - ) - raise - - -def get_pod_log(cli, name, namespace="default"): - """ - Function that retrieves the logs of a running pod in a given namespace - """ - - return cli.read_namespaced_pod_log( - name=name, - namespace=namespace, - _return_http_data_only=True, - _preload_content=False, - ) - - -def read_pod(cli, name, namespace="default"): - """ - Function that retrieves the info of a running pod in a given namespace - """ - - return cli.read_namespaced_pod(name=name, namespace=namespace) - - -def delete_job(batch_cli, name, namespace="default"): - """ - Deletes a job with the input name and namespace - """ - - try: - api_response = batch_cli.delete_namespaced_job( - name=name, - namespace=namespace, - body=client.V1DeleteOptions( - propagation_policy="Foreground", grace_period_seconds=0 - ), - ) - logging.debug("Job deleted. status='%s'" % str(api_response.status)) - return api_response - except ApiException as api: - logging.warning( - "Exception when calling \ - BatchV1Api->create_namespaced_job: %s" - % api - ) - logging.warning("Job already deleted\n") - except Exception as e: - logging.error( - "Exception when calling \ - BatchV1Api->delete_namespaced_job: %s\n" - % e - ) - sys.exit(1) - - -def list_ready_nodes(cli, label_selector=None): - """ - Returns a list of ready nodes - """ - - nodes = [] - try: - if label_selector: - ret = cli.list_node(pretty=True, label_selector=label_selector) - else: - ret = cli.list_node(pretty=True) - except ApiException as e: - logging.error("Exception when calling CoreV1Api->list_node: %s\n" % e) - raise e - for node in ret.items: - for cond in node.status.conditions: - if str(cond.type) == "Ready" and str(cond.status) == "True": - nodes.append(node.metadata.name) - - return nodes - - -def get_node(node_name, label_selector, instance_kill_count, cli): - """ - Returns active node(s) on which the scenario can be performed - """ - - if node_name in list_ready_nodes(cli): - return [node_name] - elif node_name: - logging.info( - "Node with provided node_name does not exist or the node might " - "be in NotReady state." - ) - nodes = list_ready_nodes(cli, label_selector) - if not nodes: - raise Exception("Ready nodes with the provided label selector do not exist") - logging.info("Ready nodes with the label selector %s: %s" % (label_selector, nodes)) - number_of_nodes = len(nodes) - if instance_kill_count == number_of_nodes: - return nodes - nodes_to_return = [] - for i in range(instance_kill_count): - node_to_add = nodes[random.randint(0, len(nodes) - 1)] - nodes_to_return.append(node_to_add) - nodes.remove(node_to_add) - return nodes_to_return diff --git a/tests/test_ingress_network_plugin.py b/tests/test_ingress_network_plugin.py index 3a22e71a..536565ba 100644 --- a/tests/test_ingress_network_plugin.py +++ b/tests/test_ingress_network_plugin.py @@ -1,3 +1,31 @@ +#!/usr/bin/env python3 + +""" +Test suite for ingress network shaping plugin + +This test suite covers the ingress_shaping module functions using mocks +to avoid needing actual Kubernetes infrastructure. + +Test Coverage: +- Serialization of config and output objects +- Interface discovery: get_default_interface, verify_interface, get_node_interfaces +- Virtual interface management: create_ifb, delete_ifb, create_virtual_interfaces, delete_virtual_interfaces +- Job management: get_job_pods, wait_for_job, delete_jobs +- Traffic shaping command generation: get_ingress_cmd +- Filter application: apply_ingress_filter +- End-to-end network_chaos flows: parallel, serial, error cases + +IMPORTANT: These tests use mocking and do NOT require any Kubernetes cluster. +All API calls are mocked via unittest.mock. + +Usage: + python -m unittest tests/test_ingress_network_plugin.py -v + + # Run with coverage + python -m coverage run -a -m unittest tests/test_ingress_network_plugin.py -v + +Assisted By: Claude Code +""" import unittest from unittest.mock import Mock, patch from arcaflow_plugin_sdk import plugin @@ -40,168 +68,144 @@ class NetworkScenariosTest(unittest.TestCase): self.fail, ) - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_get_default_interface(self, mock_kube_helper): + def test_get_default_interface(self): """Test getting default interface from a node""" - # Setup mocks - mock_cli = Mock() + mock_kubecli = Mock() mock_pod_template = Mock() mock_pod_template.render.return_value = "pod_yaml_content" - mock_kube_helper.create_pod.return_value = None - mock_kube_helper.exec_cmd_in_pod.return_value = ( + mock_kubecli.create_pod.return_value = None + mock_kubecli.exec_cmd_in_pod.return_value = ( "default via 192.168.1.1 dev eth0 proto dhcp metric 100\n" "172.17.0.0/16 dev docker0 proto kernel scope link src 172.17.0.1" ) - mock_kube_helper.delete_pod.return_value = None + mock_kubecli.delete_pod.return_value = None - # Test result = ingress_shaping.get_default_interface( node="test-node", pod_template=mock_pod_template, - cli=mock_cli, + kubecli=mock_kubecli, image="quay.io/krkn-chaos/krkn:tools" ) - # Assertions self.assertEqual(result, ["eth0"]) - mock_kube_helper.create_pod.assert_called_once() - mock_kube_helper.exec_cmd_in_pod.assert_called_once() - mock_kube_helper.delete_pod.assert_called_once() + mock_kubecli.create_pod.assert_called_once() + mock_kubecli.exec_cmd_in_pod.assert_called_once() + mock_kubecli.delete_pod.assert_called_once() - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_verify_interface_with_empty_list(self, mock_kube_helper): + def test_verify_interface_with_empty_list(self): """Test verifying interface when input list is empty""" - # Setup mocks - mock_cli = Mock() + mock_kubecli = Mock() mock_pod_template = Mock() mock_pod_template.render.return_value = "pod_yaml_content" - mock_kube_helper.create_pod.return_value = None - mock_kube_helper.exec_cmd_in_pod.return_value = ( + mock_kubecli.create_pod.return_value = None + mock_kubecli.exec_cmd_in_pod.return_value = ( "default via 192.168.1.1 dev eth0 proto dhcp metric 100\n" ) - mock_kube_helper.delete_pod.return_value = None + mock_kubecli.delete_pod.return_value = None - # Test result = ingress_shaping.verify_interface( input_interface_list=[], node="test-node", pod_template=mock_pod_template, - cli=mock_cli, + kubecli=mock_kubecli, image="quay.io/krkn-chaos/krkn:tools" ) - # Assertions self.assertEqual(result, ["eth0"]) - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_verify_interface_with_valid_interfaces(self, mock_kube_helper): + def test_verify_interface_with_valid_interfaces(self): """Test verifying interface with valid interface list""" - # Setup mocks - mock_cli = Mock() + mock_kubecli = Mock() mock_pod_template = Mock() mock_pod_template.render.return_value = "pod_yaml_content" - mock_kube_helper.create_pod.return_value = None - mock_kube_helper.exec_cmd_in_pod.return_value = ( + mock_kubecli.create_pod.return_value = None + mock_kubecli.exec_cmd_in_pod.return_value = ( "eth0 UP 192.168.1.10/24\n" "eth1 UP 10.0.0.5/24\n" "lo UNKNOWN 127.0.0.1/8\n" ) - mock_kube_helper.delete_pod.return_value = None + mock_kubecli.delete_pod.return_value = None - # Test result = ingress_shaping.verify_interface( input_interface_list=["eth0", "eth1"], node="test-node", pod_template=mock_pod_template, - cli=mock_cli, + kubecli=mock_kubecli, image="quay.io/krkn-chaos/krkn:tools" ) - # Assertions self.assertEqual(result, ["eth0", "eth1"]) - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_verify_interface_with_invalid_interface(self, mock_kube_helper): + def test_verify_interface_with_invalid_interface(self): """Test verifying interface with an interface that doesn't exist""" - # Setup mocks - mock_cli = Mock() + mock_kubecli = Mock() mock_pod_template = Mock() mock_pod_template.render.return_value = "pod_yaml_content" - mock_kube_helper.create_pod.return_value = None - mock_kube_helper.exec_cmd_in_pod.return_value = ( + mock_kubecli.create_pod.return_value = None + mock_kubecli.exec_cmd_in_pod.return_value = ( "eth0 UP 192.168.1.10/24\n" "lo UNKNOWN 127.0.0.1/8\n" ) - mock_kube_helper.delete_pod.return_value = None + mock_kubecli.delete_pod.return_value = None - # Test - should raise exception with self.assertRaises(Exception) as context: ingress_shaping.verify_interface( input_interface_list=["eth0", "eth99"], node="test-node", pod_template=mock_pod_template, - cli=mock_cli, + kubecli=mock_kubecli, image="quay.io/krkn-chaos/krkn:tools" ) self.assertIn("Interface eth99 not found", str(context.exception)) @patch('krkn.scenario_plugins.native.network.ingress_shaping.get_default_interface') - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_get_node_interfaces_with_label_selector(self, mock_kube_helper, mock_get_default_interface): + def test_get_node_interfaces_with_label_selector(self, mock_get_default_interface): """Test getting node interfaces using label selector""" - # Setup mocks - mock_cli = Mock() + mock_kubecli = Mock() mock_pod_template = Mock() - mock_kube_helper.get_node.return_value = ["node1", "node2"] + mock_kubecli.get_node.return_value = ["node1", "node2"] mock_get_default_interface.return_value = ["eth0"] - # Test result = ingress_shaping.get_node_interfaces( node_interface_dict=None, label_selector="node-role.kubernetes.io/worker", instance_count=2, pod_template=mock_pod_template, - cli=mock_cli, + kubecli=mock_kubecli, image="quay.io/krkn-chaos/krkn:tools" ) - # Assertions self.assertEqual(result, {"node1": ["eth0"], "node2": ["eth0"]}) self.assertEqual(mock_get_default_interface.call_count, 2) @patch('krkn.scenario_plugins.native.network.ingress_shaping.verify_interface') - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_get_node_interfaces_with_node_dict(self, mock_kube_helper, mock_verify_interface): + def test_get_node_interfaces_with_node_dict(self, mock_verify_interface): """Test getting node interfaces with provided node interface dictionary""" - # Setup mocks - mock_cli = Mock() + mock_kubecli = Mock() mock_pod_template = Mock() - mock_kube_helper.get_node.return_value = ["node1"] + mock_kubecli.get_node.return_value = ["node1"] mock_verify_interface.return_value = ["eth0", "eth1"] - # Test result = ingress_shaping.get_node_interfaces( node_interface_dict={"node1": ["eth0", "eth1"]}, label_selector=None, instance_count=1, pod_template=mock_pod_template, - cli=mock_cli, + kubecli=mock_kubecli, image="quay.io/krkn-chaos/krkn:tools" ) - # Assertions self.assertEqual(result, {"node1": ["eth0", "eth1"]}) mock_verify_interface.assert_called_once() - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_get_node_interfaces_no_selector_no_dict(self, mock_kube_helper): + def test_get_node_interfaces_no_selector_no_dict(self): """Test that exception is raised when both node dict and label selector are missing""" - mock_cli = Mock() + mock_kubecli = Mock() mock_pod_template = Mock() with self.assertRaises(Exception) as context: @@ -210,148 +214,124 @@ class NetworkScenariosTest(unittest.TestCase): label_selector=None, instance_count=1, pod_template=mock_pod_template, - cli=mock_cli, + kubecli=mock_kubecli, image="quay.io/krkn-chaos/krkn:tools" ) self.assertIn("label selector must be provided", str(context.exception)) - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_create_ifb(self, mock_kube_helper): + def test_create_ifb(self): """Test creating virtual interfaces""" - mock_cli = Mock() - mock_kube_helper.exec_cmd_in_pod.return_value = None + mock_kubecli = Mock() + mock_kubecli.exec_cmd_in_pod.return_value = None - # Test - ingress_shaping.create_ifb(cli=mock_cli, number=2, pod_name="test-pod") + ingress_shaping.create_ifb(kubecli=mock_kubecli, number=2, pod_name="test-pod") - # Assertions # Should call modprobe once and ip link set for each interface - self.assertEqual(mock_kube_helper.exec_cmd_in_pod.call_count, 3) + self.assertEqual(mock_kubecli.exec_cmd_in_pod.call_count, 3) - # Verify modprobe call - first_call = mock_kube_helper.exec_cmd_in_pod.call_args_list[0] - self.assertIn("modprobe", first_call[0][1]) - self.assertIn("numifbs=2", first_call[0][1]) + # Verify modprobe call — cmd is now the first positional arg + first_call = mock_kubecli.exec_cmd_in_pod.call_args_list[0] + self.assertIn("modprobe", first_call[0][0]) + self.assertIn("numifbs=2", first_call[0][0]) - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_delete_ifb(self, mock_kube_helper): + def test_delete_ifb(self): """Test deleting virtual interfaces""" - mock_cli = Mock() - mock_kube_helper.exec_cmd_in_pod.return_value = None + mock_kubecli = Mock() + mock_kubecli.exec_cmd_in_pod.return_value = None - # Test - ingress_shaping.delete_ifb(cli=mock_cli, pod_name="test-pod") + ingress_shaping.delete_ifb(kubecli=mock_kubecli, pod_name="test-pod") - # Assertions - mock_kube_helper.exec_cmd_in_pod.assert_called_once() - call_args = mock_kube_helper.exec_cmd_in_pod.call_args[0][1] + mock_kubecli.exec_cmd_in_pod.assert_called_once() + # cmd is now the first positional arg + call_args = mock_kubecli.exec_cmd_in_pod.call_args[0][0] self.assertIn("modprobe", call_args) self.assertIn("-r", call_args) self.assertIn("ifb", call_args) - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_get_job_pods(self, mock_kube_helper): + def test_get_job_pods(self): """Test getting pods associated with a job""" - mock_cli = Mock() + mock_kubecli = Mock() mock_api_response = Mock() mock_api_response.metadata.labels = {"controller-uid": "test-uid-123"} + mock_kubecli.list_pods.return_value = ["pod1", "pod2"] - mock_kube_helper.list_pods.return_value = ["pod1", "pod2"] + result = ingress_shaping.get_job_pods(kubecli=mock_kubecli, api_response=mock_api_response) - # Test - result = ingress_shaping.get_job_pods(cli=mock_cli, api_response=mock_api_response) - - # Assertions self.assertEqual(result, "pod1") - mock_kube_helper.list_pods.assert_called_once_with( - mock_cli, + mock_kubecli.list_pods.assert_called_once_with( label_selector="controller-uid=test-uid-123", namespace="default" ) @patch('time.sleep', return_value=None) @patch('time.time') - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_wait_for_job_success(self, mock_kube_helper, mock_time, mock_sleep): + def test_wait_for_job_success(self, mock_time, mock_sleep): """Test waiting for jobs to complete successfully""" - mock_batch_cli = Mock() - mock_time.side_effect = [0, 10, 20] # Simulate time progression + mock_kubecli = Mock() + mock_time.side_effect = [0, 10, 20] - # First job succeeds mock_response1 = Mock() mock_response1.status.succeeded = 1 mock_response1.status.failed = None - # Second job succeeds mock_response2 = Mock() mock_response2.status.succeeded = 1 mock_response2.status.failed = None - mock_kube_helper.get_job_status.side_effect = [mock_response1, mock_response2] + mock_kubecli.get_job_status.side_effect = [mock_response1, mock_response2] - # Test ingress_shaping.wait_for_job( - batch_cli=mock_batch_cli, + kubecli=mock_kubecli, job_list=["job1", "job2"], timeout=300 ) - # Assertions - self.assertEqual(mock_kube_helper.get_job_status.call_count, 2) + self.assertEqual(mock_kubecli.get_job_status.call_count, 2) @patch('time.sleep', return_value=None) @patch('time.time') - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_wait_for_job_timeout(self, mock_kube_helper, mock_time, mock_sleep): + def test_wait_for_job_timeout(self, mock_time, mock_sleep): """Test waiting for jobs times out""" - mock_batch_cli = Mock() - mock_time.side_effect = [0, 350] # Simulate timeout + mock_kubecli = Mock() + mock_time.side_effect = [0, 350] mock_response = Mock() mock_response.status.succeeded = None mock_response.status.failed = None - mock_kube_helper.get_job_status.return_value = mock_response + mock_kubecli.get_job_status.return_value = mock_response - # Test - should raise exception with self.assertRaises(Exception) as context: ingress_shaping.wait_for_job( - batch_cli=mock_batch_cli, + kubecli=mock_kubecli, job_list=["job1"], timeout=300 ) self.assertIn("timeout", str(context.exception)) - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_delete_jobs(self, mock_kube_helper): + def test_delete_jobs(self): """Test deleting jobs""" - mock_cli = Mock() - mock_batch_cli = Mock() + mock_kubecli = Mock() mock_response = Mock() mock_response.status.failed = None - mock_kube_helper.get_job_status.return_value = mock_response - mock_kube_helper.delete_job.return_value = None + mock_kubecli.get_job_status.return_value = mock_response + mock_kubecli.delete_job.return_value = None - # Test ingress_shaping.delete_jobs( - cli=mock_cli, - batch_cli=mock_batch_cli, + kubecli=mock_kubecli, job_list=["job1", "job2"] ) - # Assertions - self.assertEqual(mock_kube_helper.get_job_status.call_count, 2) - self.assertEqual(mock_kube_helper.delete_job.call_count, 2) + self.assertEqual(mock_kubecli.get_job_status.call_count, 2) + self.assertEqual(mock_kubecli.delete_job.call_count, 2) @patch('krkn.scenario_plugins.native.network.ingress_shaping.get_job_pods') - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_delete_jobs_with_failed_job(self, mock_kube_helper, mock_get_job_pods): + def test_delete_jobs_with_failed_job(self, mock_get_job_pods): """Test deleting jobs when one has failed""" - mock_cli = Mock() - mock_batch_cli = Mock() + mock_kubecli = Mock() mock_response = Mock() mock_response.status.failed = 1 @@ -362,22 +342,19 @@ class NetworkScenariosTest(unittest.TestCase): mock_log_response = Mock() mock_log_response.data.decode.return_value = "Error log content" - mock_kube_helper.get_job_status.return_value = mock_response + mock_kubecli.get_job_status.return_value = mock_response mock_get_job_pods.return_value = "failed-pod" - mock_kube_helper.read_pod.return_value = mock_pod_status - mock_kube_helper.get_pod_log.return_value = mock_log_response - mock_kube_helper.delete_job.return_value = None + mock_kubecli.read_pod.return_value = mock_pod_status + mock_kubecli.get_pod_log.return_value = mock_log_response + mock_kubecli.delete_job.return_value = None - # Test ingress_shaping.delete_jobs( - cli=mock_cli, - batch_cli=mock_batch_cli, + kubecli=mock_kubecli, job_list=["failed-job"] ) - # Assertions - mock_kube_helper.read_pod.assert_called_once() - mock_kube_helper.get_pod_log.assert_called_once() + mock_kubecli.read_pod.assert_called_once() + mock_kubecli.get_pod_log.assert_called_once() def test_get_ingress_cmd_basic(self): """Test generating ingress traffic shaping commands""" @@ -387,7 +364,6 @@ class NetworkScenariosTest(unittest.TestCase): duration=120 ) - # Assertions self.assertIn("tc qdisc add dev eth0 handle ffff: ingress", result) self.assertIn("tc filter add dev eth0", result) self.assertIn("ifb0", result) @@ -403,7 +379,6 @@ class NetworkScenariosTest(unittest.TestCase): duration=120 ) - # Assertions self.assertIn("eth0", result) self.assertIn("eth1", result) self.assertIn("ifb0", result) @@ -423,7 +398,6 @@ class NetworkScenariosTest(unittest.TestCase): duration=120 ) - # Assertions self.assertIn("delay 50ms", result) self.assertIn("loss 0.02", result) self.assertIn("rate 100mbit", result) @@ -442,12 +416,9 @@ class NetworkScenariosTest(unittest.TestCase): @patch('yaml.safe_load') @patch('krkn.scenario_plugins.native.network.ingress_shaping.create_virtual_interfaces') @patch('krkn.scenario_plugins.native.network.ingress_shaping.get_ingress_cmd') - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_apply_ingress_filter(self, mock_kube_helper, mock_get_cmd, mock_create_virtual, mock_yaml): + def test_apply_ingress_filter(self, mock_get_cmd, mock_create_virtual, mock_yaml): """Test applying ingress filters to a node""" - # Setup mocks - mock_cli = Mock() - mock_batch_cli = Mock() + mock_kubecli = Mock() mock_pod_template = Mock() mock_job_template = Mock() mock_job_template.render.return_value = "job_yaml" @@ -460,75 +431,66 @@ class NetworkScenariosTest(unittest.TestCase): mock_yaml.return_value = {"metadata": {"name": "test-job"}} mock_get_cmd.return_value = "tc commands" - mock_kube_helper.create_job.return_value = Mock() + mock_kubecli.create_job.return_value = Mock() - # Test result = ingress_shaping.apply_ingress_filter( cfg=mock_cfg, interface_list=["eth0"], node="node1", pod_template=mock_pod_template, job_template=mock_job_template, - batch_cli=mock_batch_cli, - cli=mock_cli, + kubecli=mock_kubecli, create_interfaces=True, param_selector="all", image="quay.io/krkn-chaos/krkn:tools" ) - # Assertions mock_create_virtual.assert_called_once() mock_get_cmd.assert_called_once() - mock_kube_helper.create_job.assert_called_once() + mock_kubecli.create_job.assert_called_once() self.assertEqual(result, "test-job") - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_create_virtual_interfaces(self, mock_kube_helper): + def test_create_virtual_interfaces(self): """Test creating virtual interfaces on a node""" - mock_cli = Mock() + mock_kubecli = Mock() mock_pod_template = Mock() mock_pod_template.render.return_value = "pod_yaml" - mock_kube_helper.create_pod.return_value = None - mock_kube_helper.exec_cmd_in_pod.return_value = None - mock_kube_helper.delete_pod.return_value = None + mock_kubecli.create_pod.return_value = None + mock_kubecli.exec_cmd_in_pod.return_value = None + mock_kubecli.delete_pod.return_value = None - # Test ingress_shaping.create_virtual_interfaces( - cli=mock_cli, + kubecli=mock_kubecli, interface_list=["eth0", "eth1"], node="test-node", pod_template=mock_pod_template, image="quay.io/krkn-chaos/krkn:tools" ) - # Assertions - mock_kube_helper.create_pod.assert_called_once() - mock_kube_helper.delete_pod.assert_called_once() + mock_kubecli.create_pod.assert_called_once() + mock_kubecli.delete_pod.assert_called_once() @patch('krkn.scenario_plugins.native.network.ingress_shaping.delete_ifb') - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') - def test_delete_virtual_interfaces(self, mock_kube_helper, mock_delete_ifb): + def test_delete_virtual_interfaces(self, mock_delete_ifb): """Test deleting virtual interfaces from nodes""" - mock_cli = Mock() + mock_kubecli = Mock() mock_pod_template = Mock() mock_pod_template.render.return_value = "pod_yaml" - mock_kube_helper.create_pod.return_value = None - mock_kube_helper.delete_pod.return_value = None + mock_kubecli.create_pod.return_value = None + mock_kubecli.delete_pod.return_value = None - # Test ingress_shaping.delete_virtual_interfaces( - cli=mock_cli, + kubecli=mock_kubecli, node_list=["node1", "node2"], pod_template=mock_pod_template, image="quay.io/krkn-chaos/krkn:tools" ) - # Assertions - self.assertEqual(mock_kube_helper.create_pod.call_count, 2) + self.assertEqual(mock_kubecli.create_pod.call_count, 2) self.assertEqual(mock_delete_ifb.call_count, 2) - self.assertEqual(mock_kube_helper.delete_pod.call_count, 2) + self.assertEqual(mock_kubecli.delete_pod.call_count, 2) @patch('krkn.scenario_plugins.native.network.ingress_shaping.Environment') @patch('krkn.scenario_plugins.native.network.ingress_shaping.FileSystemLoader') @@ -538,22 +500,18 @@ class NetworkScenariosTest(unittest.TestCase): @patch('krkn.scenario_plugins.native.network.ingress_shaping.wait_for_job') @patch('krkn.scenario_plugins.native.network.ingress_shaping.apply_ingress_filter') @patch('krkn.scenario_plugins.native.network.ingress_shaping.get_node_interfaces') - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') + @patch('krkn.scenario_plugins.native.network.ingress_shaping.KrknKubernetes') def test_network_chaos_parallel_execution( - self, mock_kube_helper, mock_get_nodes, mock_apply_filter, + self, mock_kubecli_class, mock_get_nodes, mock_apply_filter, mock_wait_job, mock_delete_virtual, mock_delete_jobs, mock_yaml, mock_file_loader, mock_env ): """Test network chaos with parallel execution""" - # Setup mocks - mock_cli = Mock() - mock_batch_cli = Mock() + mock_kubecli_class.return_value = Mock() mock_yaml.return_value = {"metadata": {"name": "test-pod"}} - mock_kube_helper.setup_kubernetes.return_value = (mock_cli, mock_batch_cli) mock_get_nodes.return_value = {"node1": ["eth0"], "node2": ["eth1"]} mock_apply_filter.side_effect = ["job1", "job2"] - # Test cfg = ingress_shaping.NetworkScenarioConfig( label_selector="node-role.kubernetes.io/worker", instance_count=2, @@ -565,7 +523,6 @@ class NetworkScenariosTest(unittest.TestCase): output_id, output_data = ingress_shaping.network_chaos(params=cfg, run_id="test-run") - # Assertions self.assertEqual(output_id, "success") self.assertEqual(output_data.filter_direction, "ingress") self.assertEqual(output_data.execution_type, "parallel") @@ -582,22 +539,18 @@ class NetworkScenariosTest(unittest.TestCase): @patch('krkn.scenario_plugins.native.network.ingress_shaping.wait_for_job') @patch('krkn.scenario_plugins.native.network.ingress_shaping.apply_ingress_filter') @patch('krkn.scenario_plugins.native.network.ingress_shaping.get_node_interfaces') - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') + @patch('krkn.scenario_plugins.native.network.ingress_shaping.KrknKubernetes') def test_network_chaos_serial_execution( - self, mock_kube_helper, mock_get_nodes, mock_apply_filter, + self, mock_kubecli_class, mock_get_nodes, mock_apply_filter, mock_wait_job, mock_delete_virtual, mock_delete_jobs, mock_sleep, mock_yaml, mock_file_loader, mock_env ): """Test network chaos with serial execution""" - # Setup mocks - mock_cli = Mock() - mock_batch_cli = Mock() + mock_kubecli_class.return_value = Mock() mock_yaml.return_value = {"metadata": {"name": "test-pod"}} - mock_kube_helper.setup_kubernetes.return_value = (mock_cli, mock_batch_cli) mock_get_nodes.return_value = {"node1": ["eth0"]} mock_apply_filter.return_value = "job1" - # Test cfg = ingress_shaping.NetworkScenarioConfig( label_selector="node-role.kubernetes.io/worker", instance_count=1, @@ -609,12 +562,9 @@ class NetworkScenariosTest(unittest.TestCase): output_id, output_data = ingress_shaping.network_chaos(params=cfg, run_id="test-run") - # Assertions self.assertEqual(output_id, "success") self.assertEqual(output_data.execution_type, "serial") - # Should be called once per parameter per node self.assertEqual(mock_apply_filter.call_count, 2) - # Should wait for jobs twice (once per parameter) self.assertEqual(mock_wait_job.call_count, 2) @patch('krkn.scenario_plugins.native.network.ingress_shaping.Environment') @@ -623,20 +573,16 @@ class NetworkScenariosTest(unittest.TestCase): @patch('krkn.scenario_plugins.native.network.ingress_shaping.delete_jobs') @patch('krkn.scenario_plugins.native.network.ingress_shaping.delete_virtual_interfaces') @patch('krkn.scenario_plugins.native.network.ingress_shaping.get_node_interfaces') - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') + @patch('krkn.scenario_plugins.native.network.ingress_shaping.KrknKubernetes') def test_network_chaos_invalid_execution_type( - self, mock_kube_helper, mock_get_nodes, mock_delete_virtual, mock_delete_jobs, mock_yaml, + self, mock_kubecli_class, mock_get_nodes, mock_delete_virtual, mock_delete_jobs, mock_yaml, mock_file_loader, mock_env ): """Test network chaos with invalid execution type""" - # Setup mocks - mock_cli = Mock() - mock_batch_cli = Mock() + mock_kubecli_class.return_value = Mock() mock_yaml.return_value = {"metadata": {"name": "test-pod"}} - mock_kube_helper.setup_kubernetes.return_value = (mock_cli, mock_batch_cli) mock_get_nodes.return_value = {"node1": ["eth0"]} - # Test cfg = ingress_shaping.NetworkScenarioConfig( label_selector="node-role.kubernetes.io/worker", instance_count=1, @@ -647,7 +593,6 @@ class NetworkScenariosTest(unittest.TestCase): output_id, output_data = ingress_shaping.network_chaos(params=cfg, run_id="test-run") - # Assertions self.assertEqual(output_id, "error") self.assertIn("Invalid execution type", output_data.error) @@ -657,20 +602,16 @@ class NetworkScenariosTest(unittest.TestCase): @patch('krkn.scenario_plugins.native.network.ingress_shaping.delete_jobs') @patch('krkn.scenario_plugins.native.network.ingress_shaping.delete_virtual_interfaces') @patch('krkn.scenario_plugins.native.network.ingress_shaping.get_node_interfaces') - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') + @patch('krkn.scenario_plugins.native.network.ingress_shaping.KrknKubernetes') def test_network_chaos_get_nodes_error( - self, mock_kube_helper, mock_get_nodes, mock_delete_virtual, mock_delete_jobs, mock_yaml, + self, mock_kubecli_class, mock_get_nodes, mock_delete_virtual, mock_delete_jobs, mock_yaml, mock_file_loader, mock_env ): """Test network chaos when getting nodes fails""" - # Setup mocks - mock_cli = Mock() - mock_batch_cli = Mock() + mock_kubecli_class.return_value = Mock() mock_yaml.return_value = {"metadata": {"name": "test-pod"}} - mock_kube_helper.setup_kubernetes.return_value = (mock_cli, mock_batch_cli) mock_get_nodes.side_effect = Exception("Failed to get nodes") - # Test cfg = ingress_shaping.NetworkScenarioConfig( label_selector="node-role.kubernetes.io/worker", instance_count=1, @@ -679,7 +620,6 @@ class NetworkScenariosTest(unittest.TestCase): output_id, output_data = ingress_shaping.network_chaos(params=cfg, run_id="test-run") - # Assertions self.assertEqual(output_id, "error") self.assertIn("Failed to get nodes", output_data.error) @@ -690,22 +630,18 @@ class NetworkScenariosTest(unittest.TestCase): @patch('krkn.scenario_plugins.native.network.ingress_shaping.delete_virtual_interfaces') @patch('krkn.scenario_plugins.native.network.ingress_shaping.apply_ingress_filter') @patch('krkn.scenario_plugins.native.network.ingress_shaping.get_node_interfaces') - @patch('krkn.scenario_plugins.native.network.ingress_shaping.kube_helper') + @patch('krkn.scenario_plugins.native.network.ingress_shaping.KrknKubernetes') def test_network_chaos_apply_filter_error( - self, mock_kube_helper, mock_get_nodes, mock_apply_filter, + self, mock_kubecli_class, mock_get_nodes, mock_apply_filter, mock_delete_virtual, mock_delete_jobs, mock_yaml, mock_file_loader, mock_env ): """Test network chaos when applying filter fails""" - # Setup mocks - mock_cli = Mock() - mock_batch_cli = Mock() + mock_kubecli_class.return_value = Mock() mock_yaml.return_value = {"metadata": {"name": "test-pod"}} - mock_kube_helper.setup_kubernetes.return_value = (mock_cli, mock_batch_cli) mock_get_nodes.return_value = {"node1": ["eth0"]} mock_apply_filter.side_effect = Exception("Failed to apply filter") - # Test cfg = ingress_shaping.NetworkScenarioConfig( label_selector="node-role.kubernetes.io/worker", instance_count=1, @@ -715,10 +651,8 @@ class NetworkScenariosTest(unittest.TestCase): output_id, output_data = ingress_shaping.network_chaos(params=cfg, run_id="test-run") - # Assertions self.assertEqual(output_id, "error") self.assertIn("Failed to apply filter", output_data.error) - # Cleanup should still be called mock_delete_virtual.assert_called_once() diff --git a/tests/test_pod_network_outage.py b/tests/test_pod_network_outage.py index e22d2160..2fd31cad 100644 --- a/tests/test_pod_network_outage.py +++ b/tests/test_pod_network_outage.py @@ -1,13 +1,31 @@ +#!/usr/bin/env python3 + +""" +Test suite for pod network outage plugin + +This test suite covers the get_test_pods function in pod_network_outage_plugin +using mocks to avoid needing actual Kubernetes infrastructure. + +Test Coverage: +- get_test_pods with exclude_label filters +- get_test_pods with pod_name taking precedence over label filters +- get_test_pods with both pod_name and exclude_label + +IMPORTANT: These tests use mocking and do NOT require any Kubernetes cluster. +All Kubernetes API calls are mocked via unittest.mock. + +Usage: + python -m unittest tests/test_pod_network_outage.py -v + + # Run with coverage + python -m coverage run -a -m unittest tests/test_pod_network_outage.py -v + +Assisted By: Claude Code +""" + import unittest -from unittest.mock import MagicMock, patch -import sys -import os +from unittest.mock import MagicMock -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - -from krkn.scenario_plugins.native.pod_network_outage.kubernetes_functions import ( - list_pods, -) from krkn.scenario_plugins.native.pod_network_outage.pod_network_outage_plugin import ( get_test_pods, ) @@ -15,36 +33,33 @@ from krkn.scenario_plugins.native.pod_network_outage.pod_network_outage_plugin i class TestPodNetworkOutage(unittest.TestCase): def test_list_pods_with_exclude_label(self): - """Test that list_pods correctly excludes pods with matching exclude_label""" - # Create mock pod items - pod1 = MagicMock() - pod1.metadata.name = "pod1" - pod1.metadata.labels = {"app": "test", "skip": "true"} + """Test that get_test_pods passes exclude_label to kubecli.list_pods""" + mock_kubecli = MagicMock() + mock_kubecli.list_pods.return_value = ["pod2", "pod3"] - pod2 = MagicMock() - pod2.metadata.name = "pod2" - pod2.metadata.labels = {"app": "test"} + result = get_test_pods(None, "app=test", "test-namespace", mock_kubecli, "skip=true") - pod3 = MagicMock() - pod3.metadata.name = "pod3" - pod3.metadata.labels = {"app": "test", "skip": "false"} - - # Create mock API response - mock_response = MagicMock() - mock_response.items = [pod1, pod2, pod3] - - # Create mock client - mock_cli = MagicMock() - mock_cli.list_namespaced_pod.return_value = mock_response - - # Test without exclude_label - result = list_pods(mock_cli, "test-namespace", "app=test") - self.assertEqual(result, ["pod1", "pod2", "pod3"]) - - # Test with exclude_label - result = list_pods(mock_cli, "test-namespace", "app=test", "skip=true") + mock_kubecli.list_pods.assert_called_once_with( + label_selector="app=test", + namespace="test-namespace", + exclude_label="skip=true", + ) self.assertEqual(result, ["pod2", "pod3"]) + def test_list_pods_without_exclude_label(self): + """Test that get_test_pods works without exclude_label""" + mock_kubecli = MagicMock() + mock_kubecli.list_pods.return_value = ["pod1", "pod2", "pod3"] + + result = get_test_pods(None, "app=test", "test-namespace", mock_kubecli) + + mock_kubecli.list_pods.assert_called_once_with( + label_selector="app=test", + namespace="test-namespace", + exclude_label=None, + ) + self.assertEqual(result, ["pod1", "pod2", "pod3"]) + def test_get_test_pods_with_exclude_label(self): """Test that get_test_pods passes exclude_label to list_pods correctly""" # Create mock kubecli