mirror of
https://github.com/krkn-chaos/krkn.git
synced 2026-02-14 18:10:00 +00:00
api refactoring + pod network filter scenario
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
This commit is contained in:
@@ -26,12 +26,7 @@ class NativeScenarioPlugin(AbstractScenarioPlugin):
|
||||
|
||||
try:
|
||||
self.start_monitoring(pool, kill_scenarios)
|
||||
PLUGINS.run(
|
||||
scenario,
|
||||
lib_telemetry.get_lib_kubernetes().get_kubeconfig_path(),
|
||||
krkn_config,
|
||||
run_uuid,
|
||||
)
|
||||
PLUGINS.run(scenario, krkn_config)
|
||||
result = pool.join()
|
||||
scenario_telemetry.affected_pods = result
|
||||
if result.error:
|
||||
@@ -49,7 +44,7 @@ class NativeScenarioPlugin(AbstractScenarioPlugin):
|
||||
return [
|
||||
"pod_disruption_scenarios",
|
||||
"pod_network_scenarios",
|
||||
"ingress_node_scenarios"
|
||||
"ingress_node_scenarios",
|
||||
]
|
||||
|
||||
def start_monitoring(self, pool: PodsMonitorPool, scenarios: list[Any]):
|
||||
|
||||
@@ -28,7 +28,7 @@ class BaseNetworkChaosConfig:
|
||||
errors.append(
|
||||
f"{self.execution} is not in supported execution mod: {','.join(self.supported_execution)}"
|
||||
)
|
||||
if self.label_selector is None:
|
||||
if self.id == "node_network_filter" and self.label_selector is None:
|
||||
errors.append("label_selector cannot be None")
|
||||
return errors
|
||||
|
||||
|
||||
@@ -3,19 +3,25 @@ import logging
|
||||
import queue
|
||||
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import BaseNetworkChaosConfig, NetworkChaosScenarioType
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import (
|
||||
BaseNetworkChaosConfig,
|
||||
NetworkChaosScenarioType,
|
||||
)
|
||||
|
||||
|
||||
class AbstractNetworkChaosModule(abc.ABC):
|
||||
"""
|
||||
The abstract class that needs to be implemented by each Network Chaos Scenario
|
||||
"""
|
||||
|
||||
kubecli: KrknTelemetryOpenshift
|
||||
base_network_config: BaseNetworkChaosConfig
|
||||
|
||||
@abc.abstractmethod
|
||||
def run(self, target: str, kubecli: KrknTelemetryOpenshift, error_queue: queue.Queue = None):
|
||||
def run(self, target: str, error_queue: queue.Queue = None):
|
||||
"""
|
||||
the entrypoint method for the Network Chaos Scenario
|
||||
:param target: The resource name that will be targeted by the scenario (Node Name, Pod Name etc.)
|
||||
:param kubecli: The `KrknTelemetryOpenshift` needed by the scenario to access to the krkn-lib methods
|
||||
:param error_queue: A queue that will be used by the plugin to push the errors raised during the execution of parallel modules
|
||||
"""
|
||||
pass
|
||||
@@ -28,31 +34,17 @@ class AbstractNetworkChaosModule(abc.ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
def get_targets(self) -> list[str]:
|
||||
"""
|
||||
checks and returns the targets based on the common scenario configuration
|
||||
"""
|
||||
|
||||
def log_info(self, message: str, parallel: bool = False, node_name: str = ""):
|
||||
"""
|
||||
log helper method for INFO severity to be used in the scenarios
|
||||
"""
|
||||
if parallel:
|
||||
logging.info(f"[{node_name}]: {message}")
|
||||
else:
|
||||
logging.info(message)
|
||||
pass
|
||||
|
||||
def log_warning(self, message: str, parallel: bool = False, node_name: str = ""):
|
||||
"""
|
||||
log helper method for WARNING severity to be used in the scenarios
|
||||
"""
|
||||
if parallel:
|
||||
logging.warning(f"[{node_name}]: {message}")
|
||||
else:
|
||||
logging.warning(message)
|
||||
|
||||
|
||||
def log_error(self, message: str, parallel: bool = False, node_name: str = ""):
|
||||
"""
|
||||
log helper method for ERROR severity to be used in the scenarios
|
||||
"""
|
||||
if parallel:
|
||||
logging.error(f"[{node_name}]: {message}")
|
||||
else:
|
||||
logging.error(message)
|
||||
def __init__(
|
||||
self,
|
||||
base_network_config: BaseNetworkChaosConfig,
|
||||
kubecli: KrknTelemetryOpenshift,
|
||||
):
|
||||
self.kubecli = kubecli
|
||||
self.base_network_config = base_network_config
|
||||
|
||||
@@ -1,11 +1,6 @@
|
||||
import os
|
||||
import queue
|
||||
import time
|
||||
|
||||
import yaml
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
|
||||
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
from krkn_lib.utils import get_random_string
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import (
|
||||
@@ -16,88 +11,92 @@ from krkn.scenario_plugins.network_chaos_ng.models import (
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
|
||||
AbstractNetworkChaosModule,
|
||||
)
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
|
||||
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_filter import (
|
||||
deploy_network_filter_pod,
|
||||
apply_network_rules,
|
||||
clean_network_rules,
|
||||
generate_rules,
|
||||
get_default_interface,
|
||||
)
|
||||
|
||||
|
||||
class NodeNetworkFilterModule(AbstractNetworkChaosModule):
|
||||
config: NetworkFilterConfig
|
||||
kubecli: KrknTelemetryOpenshift
|
||||
|
||||
def run(
|
||||
self,
|
||||
target: str,
|
||||
kubecli: KrknTelemetryOpenshift,
|
||||
error_queue: queue.Queue = None,
|
||||
):
|
||||
def run(self, target: str, error_queue: queue.Queue = None):
|
||||
parallel = False
|
||||
if error_queue:
|
||||
parallel = True
|
||||
try:
|
||||
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
|
||||
env = Environment(loader=file_loader, autoescape=True)
|
||||
pod_name = f"node-filter-{get_random_string(5)}"
|
||||
pod_template = env.get_template("templates/network-chaos.j2")
|
||||
pod_body = yaml.safe_load(
|
||||
pod_template.render(
|
||||
pod_name=pod_name,
|
||||
namespace=self.config.namespace,
|
||||
host_network=True,
|
||||
target=target,
|
||||
workload_image=self.config.image,
|
||||
)
|
||||
)
|
||||
self.log_info(
|
||||
f"creating pod to filter "
|
||||
log_info(
|
||||
f"creating workload to filter node {self.config.target} network"
|
||||
f"ports {','.join([str(port) for port in self.config.ports])}, "
|
||||
f"ingress:{str(self.config.ingress)}, "
|
||||
f"egress:{str(self.config.egress)}",
|
||||
parallel,
|
||||
target,
|
||||
)
|
||||
kubecli.get_lib_kubernetes().create_pod(
|
||||
pod_body, self.config.namespace, 300
|
||||
|
||||
pod_name = f"node-filter-{get_random_string(5)}"
|
||||
deploy_network_filter_pod(
|
||||
self.config,
|
||||
target,
|
||||
pod_name,
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
)
|
||||
|
||||
if len(self.config.interfaces) == 0:
|
||||
interfaces = [
|
||||
self.get_default_interface(pod_name, self.config.namespace, kubecli)
|
||||
get_default_interface(
|
||||
pod_name,
|
||||
self.config.namespace,
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
)
|
||||
]
|
||||
self.log_info(f"detected default interface {interfaces[0]}")
|
||||
|
||||
log_info(
|
||||
f"detected default interface {interfaces[0]}", parallel, target
|
||||
)
|
||||
|
||||
else:
|
||||
interfaces = self.config.interfaces
|
||||
|
||||
input_rules, output_rules = self.generate_rules(interfaces)
|
||||
input_rules, output_rules = generate_rules(interfaces, self.config)
|
||||
|
||||
for rule in input_rules:
|
||||
self.log_info(f"applying iptables INPUT rule: {rule}", parallel, target)
|
||||
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
|
||||
[rule], pod_name, self.config.namespace
|
||||
)
|
||||
for rule in output_rules:
|
||||
self.log_info(
|
||||
f"applying iptables OUTPUT rule: {rule}", parallel, target
|
||||
)
|
||||
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
|
||||
[rule], pod_name, self.config.namespace
|
||||
)
|
||||
self.log_info(
|
||||
f"waiting {self.config.test_duration} seconds before removing the iptables rules"
|
||||
apply_network_rules(
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
input_rules,
|
||||
output_rules,
|
||||
pod_name,
|
||||
self.config.namespace,
|
||||
parallel,
|
||||
target,
|
||||
)
|
||||
|
||||
log_info(
|
||||
f"waiting {self.config.test_duration} seconds before removing the iptables rules",
|
||||
parallel,
|
||||
target,
|
||||
)
|
||||
|
||||
time.sleep(self.config.test_duration)
|
||||
self.log_info("removing iptables rules")
|
||||
for _ in input_rules:
|
||||
# always deleting the first rule since has been inserted from the top
|
||||
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
|
||||
[f"iptables -D INPUT 1"], pod_name, self.config.namespace
|
||||
)
|
||||
for _ in output_rules:
|
||||
# always deleting the first rule since has been inserted from the top
|
||||
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
|
||||
[f"iptables -D OUTPUT 1"], pod_name, self.config.namespace
|
||||
)
|
||||
self.log_info(
|
||||
f"deleting network chaos pod {pod_name} from {self.config.namespace}"
|
||||
|
||||
log_info("removing iptables rules", parallel, target)
|
||||
|
||||
clean_network_rules(
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
input_rules,
|
||||
output_rules,
|
||||
pod_name,
|
||||
self.config.namespace,
|
||||
)
|
||||
|
||||
kubecli.get_lib_kubernetes().delete_pod(pod_name, self.config.namespace)
|
||||
self.kubecli.get_lib_kubernetes().delete_pod(
|
||||
pod_name, self.config.namespace
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
if error_queue is None:
|
||||
@@ -105,33 +104,25 @@ class NodeNetworkFilterModule(AbstractNetworkChaosModule):
|
||||
else:
|
||||
error_queue.put(str(e))
|
||||
|
||||
def __init__(self, config: NetworkFilterConfig):
|
||||
def __init__(self, config: NetworkFilterConfig, kubecli: KrknTelemetryOpenshift):
|
||||
super().__init__(config, kubecli)
|
||||
self.config = config
|
||||
|
||||
def get_config(self) -> (NetworkChaosScenarioType, BaseNetworkChaosConfig):
|
||||
return NetworkChaosScenarioType.Node, self.config
|
||||
|
||||
def get_default_interface(
|
||||
self, pod_name: str, namespace: str, kubecli: KrknTelemetryOpenshift
|
||||
) -> str:
|
||||
cmd = "ip r | grep default | awk '/default/ {print $5}'"
|
||||
output = kubecli.get_lib_kubernetes().exec_cmd_in_pod(
|
||||
[cmd], pod_name, namespace
|
||||
)
|
||||
return output.replace("\n", "")
|
||||
def get_targets(self) -> list[str]:
|
||||
if self.base_network_config.label_selector:
|
||||
return self.kubecli.get_lib_kubernetes().list_nodes(
|
||||
self.base_network_config.label_selector
|
||||
)
|
||||
else:
|
||||
if not self.config.target:
|
||||
raise Exception(
|
||||
"neither node selector nor node_name (target) specified, aborting."
|
||||
)
|
||||
node_info = self.kubecli.get_lib_kubernetes().list_nodes()
|
||||
if self.config.target not in node_info:
|
||||
raise Exception(f"node {self.config.target} not found, aborting")
|
||||
|
||||
def generate_rules(self, interfaces: list[str]) -> (list[str], list[str]):
|
||||
input_rules = []
|
||||
output_rules = []
|
||||
for interface in interfaces:
|
||||
for port in self.config.ports:
|
||||
if self.config.egress:
|
||||
output_rules.append(
|
||||
f"iptables -I OUTPUT 1 -p tcp --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
|
||||
)
|
||||
|
||||
if self.config.ingress:
|
||||
input_rules.append(
|
||||
f"iptables -I INPUT 1 -i {interface} -p tcp --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
|
||||
)
|
||||
return input_rules, output_rules
|
||||
return [self.config.target]
|
||||
|
||||
@@ -0,0 +1,179 @@
|
||||
import queue
|
||||
import time
|
||||
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
from krkn_lib.utils import get_random_string
|
||||
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import (
|
||||
NetworkChaosScenarioType,
|
||||
BaseNetworkChaosConfig,
|
||||
NetworkFilterConfig,
|
||||
)
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
|
||||
AbstractNetworkChaosModule,
|
||||
)
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_filter import (
|
||||
deploy_network_filter_pod,
|
||||
get_default_interface,
|
||||
generate_namespaced_rules,
|
||||
apply_network_rules,
|
||||
clean_network_rules_namespaced,
|
||||
)
|
||||
|
||||
|
||||
class PodNetworkFilterModule(AbstractNetworkChaosModule):
|
||||
config: NetworkFilterConfig
|
||||
|
||||
def run(self, target: str, error_queue: queue.Queue = None):
|
||||
parallel = False
|
||||
if error_queue:
|
||||
parallel = True
|
||||
try:
|
||||
pod_name = f"pod-filter-{get_random_string(5)}"
|
||||
container_name = f"fedora-container-{get_random_string(5)}"
|
||||
pod_info = self.kubecli.get_lib_kubernetes().get_pod_info(
|
||||
self.config.target, self.config.namespace
|
||||
)
|
||||
|
||||
log_info(
|
||||
f"creating workload to filter pod {self.config.target} network"
|
||||
f"ports {','.join([str(port) for port in self.config.ports])}, "
|
||||
f"ingress:{str(self.config.ingress)}, "
|
||||
f"egress:{str(self.config.egress)}",
|
||||
parallel,
|
||||
pod_name,
|
||||
)
|
||||
|
||||
if not pod_info:
|
||||
raise Exception(
|
||||
f"impossible to retrieve infos for pod {self.config.target} namespace {self.config.namespace}"
|
||||
)
|
||||
|
||||
deploy_network_filter_pod(
|
||||
self.config,
|
||||
pod_info.nodeName,
|
||||
pod_name,
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
container_name,
|
||||
)
|
||||
|
||||
if len(self.config.interfaces) == 0:
|
||||
interfaces = [
|
||||
get_default_interface(
|
||||
pod_name,
|
||||
self.config.namespace,
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
)
|
||||
]
|
||||
|
||||
log_info(
|
||||
f"detected default interface {interfaces[0]}",
|
||||
parallel,
|
||||
pod_name,
|
||||
)
|
||||
|
||||
else:
|
||||
interfaces = self.config.interfaces
|
||||
|
||||
container_ids = self.kubecli.get_lib_kubernetes().get_container_ids(
|
||||
self.config.target, self.config.namespace
|
||||
)
|
||||
|
||||
if len(container_ids) == 0:
|
||||
raise Exception(
|
||||
f"impossible to resolve container id for pod {self.config.target} namespace {self.config.namespace}"
|
||||
)
|
||||
|
||||
log_info(f"targeting container {container_ids[0]}", parallel, pod_name)
|
||||
|
||||
pid = self.kubecli.get_lib_kubernetes().get_pod_pid(
|
||||
base_pod_name=pod_name,
|
||||
base_pod_namespace=self.config.namespace,
|
||||
base_pod_container_name=container_name,
|
||||
pod_name=self.config.target,
|
||||
pod_namespace=self.config.namespace,
|
||||
pod_container_id=container_ids[0],
|
||||
)
|
||||
|
||||
if not pid:
|
||||
raise Exception(
|
||||
f"impossible to resolve pid for pod {self.config.target}"
|
||||
)
|
||||
|
||||
log_info(
|
||||
f"resolved pid {pid} in node {pod_info.nodeName} for pod {self.config.target}",
|
||||
parallel,
|
||||
pod_name,
|
||||
)
|
||||
|
||||
input_rules, output_rules = generate_namespaced_rules(
|
||||
interfaces, self.config, pid
|
||||
)
|
||||
|
||||
apply_network_rules(
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
input_rules,
|
||||
output_rules,
|
||||
pod_name,
|
||||
self.config.namespace,
|
||||
parallel,
|
||||
target,
|
||||
)
|
||||
|
||||
log_info(
|
||||
f"waiting {self.config.test_duration} seconds before removing the iptables rules",
|
||||
parallel,
|
||||
pod_name,
|
||||
)
|
||||
|
||||
time.sleep(self.config.test_duration)
|
||||
|
||||
log_info("removing iptables rules", parallel, pod_name)
|
||||
|
||||
clean_network_rules_namespaced(
|
||||
self.kubecli.get_lib_kubernetes(),
|
||||
input_rules,
|
||||
output_rules,
|
||||
pod_name,
|
||||
self.config.namespace,
|
||||
pid,
|
||||
)
|
||||
|
||||
self.kubecli.get_lib_kubernetes().delete_pod(
|
||||
pod_name, self.config.namespace
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
if error_queue is None:
|
||||
raise e
|
||||
else:
|
||||
error_queue.put(str(e))
|
||||
|
||||
def __init__(self, config: NetworkFilterConfig, kubecli: KrknTelemetryOpenshift):
|
||||
super().__init__(config, kubecli)
|
||||
self.config = config
|
||||
|
||||
def get_config(self) -> (NetworkChaosScenarioType, BaseNetworkChaosConfig):
|
||||
return NetworkChaosScenarioType.Pod, self.config
|
||||
|
||||
def get_targets(self) -> list[str]:
|
||||
if not self.config.namespace:
|
||||
raise Exception("namespace not specified, aborting")
|
||||
if self.base_network_config.label_selector:
|
||||
return self.kubecli.get_lib_kubernetes().list_pods(
|
||||
self.config.namespace, self.config.label_selector
|
||||
)
|
||||
else:
|
||||
if not self.config.target:
|
||||
raise Exception(
|
||||
"neither node selector nor node_name (target) specified, aborting."
|
||||
)
|
||||
if not self.kubecli.get_lib_kubernetes().check_if_pod_exists(
|
||||
self.config.target, self.config.namespace
|
||||
):
|
||||
raise Exception(
|
||||
f"pod {self.config.target} not found in namespace {self.config.namespace}"
|
||||
)
|
||||
|
||||
return [self.config.target]
|
||||
@@ -7,10 +7,11 @@ spec:
|
||||
{% if host_network %}
|
||||
hostNetwork: true
|
||||
{%endif%}
|
||||
hostPID: true
|
||||
nodeSelector:
|
||||
kubernetes.io/hostname: {{target}}
|
||||
containers:
|
||||
- name: fedora
|
||||
- name: {{container_name}}
|
||||
imagePullPolicy: Always
|
||||
image: {{workload_image}}
|
||||
securityContext:
|
||||
|
||||
31
krkn/scenario_plugins/network_chaos_ng/modules/utils.py
Normal file
31
krkn/scenario_plugins/network_chaos_ng/modules/utils.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import logging
|
||||
|
||||
|
||||
def log_info(message: str, parallel: bool = False, node_name: str = ""):
|
||||
"""
|
||||
log helper method for INFO severity to be used in the scenarios
|
||||
"""
|
||||
if parallel:
|
||||
logging.info(f"[{node_name}]: {message}")
|
||||
else:
|
||||
logging.info(message)
|
||||
|
||||
|
||||
def log_error(self, message: str, parallel: bool = False, node_name: str = ""):
|
||||
"""
|
||||
log helper method for ERROR severity to be used in the scenarios
|
||||
"""
|
||||
if parallel:
|
||||
logging.error(f"[{node_name}]: {message}")
|
||||
else:
|
||||
logging.error(message)
|
||||
|
||||
|
||||
def log_warning(self, message: str, parallel: bool = False, node_name: str = ""):
|
||||
"""
|
||||
log helper method for WARNING severity to be used in the scenarios
|
||||
"""
|
||||
if parallel:
|
||||
logging.warning(f"[{node_name}]: {message}")
|
||||
else:
|
||||
logging.warning(message)
|
||||
@@ -0,0 +1,131 @@
|
||||
import os
|
||||
|
||||
import yaml
|
||||
from jinja2 import FileSystemLoader, Environment
|
||||
from krkn_lib.k8s import KrknKubernetes
|
||||
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import NetworkFilterConfig
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
|
||||
|
||||
|
||||
def generate_rules(
|
||||
interfaces: list[str], config: NetworkFilterConfig
|
||||
) -> (list[str], list[str]):
|
||||
input_rules = []
|
||||
output_rules = []
|
||||
for interface in interfaces:
|
||||
for port in config.ports:
|
||||
if config.egress:
|
||||
output_rules.append(
|
||||
f"iptables -I OUTPUT 1 -p tcp --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
|
||||
)
|
||||
|
||||
if config.ingress:
|
||||
input_rules.append(
|
||||
f"iptables -I INPUT 1 -i {interface} -p tcp --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
|
||||
)
|
||||
return input_rules, output_rules
|
||||
|
||||
|
||||
def generate_namespaced_rules(
|
||||
interfaces: list[str], config: NetworkFilterConfig, pid: str
|
||||
) -> (list[str], list[str]):
|
||||
|
||||
input_rules, output_rules = generate_rules(interfaces, config)
|
||||
|
||||
namespaced_input_rules = [
|
||||
f"nsenter --target {pid} --net -- {rule}" for rule in input_rules
|
||||
]
|
||||
namespaced_output_rules = [
|
||||
f"nsenter --target {pid} --net -- {rule}" for rule in output_rules
|
||||
]
|
||||
|
||||
return namespaced_input_rules, namespaced_output_rules
|
||||
|
||||
|
||||
def deploy_network_filter_pod(
|
||||
config: NetworkFilterConfig,
|
||||
target_node: str,
|
||||
pod_name: str,
|
||||
kubecli: KrknKubernetes,
|
||||
container_name: str = "fedora",
|
||||
):
|
||||
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
|
||||
env = Environment(loader=file_loader, autoescape=True)
|
||||
pod_template = env.get_template("templates/network-chaos.j2")
|
||||
pod_body = yaml.safe_load(
|
||||
pod_template.render(
|
||||
pod_name=pod_name,
|
||||
namespace=config.namespace,
|
||||
host_network=True,
|
||||
target=target_node,
|
||||
container_name=container_name,
|
||||
workload_image=config.image,
|
||||
)
|
||||
)
|
||||
|
||||
kubecli.create_pod(pod_body, config.namespace, 300)
|
||||
|
||||
|
||||
def apply_network_rules(
|
||||
kubecli: KrknKubernetes,
|
||||
input_rules: list[str],
|
||||
output_rules: list[str],
|
||||
pod_name: str,
|
||||
namespace: str,
|
||||
parallel: bool,
|
||||
node_name: str,
|
||||
):
|
||||
for rule in input_rules:
|
||||
log_info(f"applying iptables INPUT rule: {rule}", parallel, node_name)
|
||||
kubecli.exec_cmd_in_pod([rule], pod_name, namespace)
|
||||
for rule in output_rules:
|
||||
log_info(f"applying iptables OUTPUT rule: {rule}", parallel, node_name)
|
||||
kubecli.exec_cmd_in_pod([rule], pod_name, namespace)
|
||||
|
||||
|
||||
def clean_network_rules(
|
||||
kubecli: KrknKubernetes,
|
||||
input_rules: list[str],
|
||||
output_rules: list[str],
|
||||
pod_name: str,
|
||||
namespace: str,
|
||||
):
|
||||
for _ in input_rules:
|
||||
# always deleting the first rule since has been inserted from the top
|
||||
kubecli.exec_cmd_in_pod([f"iptables -D INPUT 1"], pod_name, namespace)
|
||||
for _ in output_rules:
|
||||
# always deleting the first rule since has been inserted from the top
|
||||
kubecli.exec_cmd_in_pod([f"iptables -D OUTPUT 1"], pod_name, namespace)
|
||||
|
||||
|
||||
def clean_network_rules_namespaced(
|
||||
kubecli: KrknKubernetes,
|
||||
input_rules: list[str],
|
||||
output_rules: list[str],
|
||||
pod_name: str,
|
||||
namespace: str,
|
||||
pid: str,
|
||||
):
|
||||
for _ in input_rules:
|
||||
# always deleting the first rule since has been inserted from the top
|
||||
kubecli.exec_cmd_in_pod(
|
||||
[f"nsenter --target {pid} --net -- iptables -D INPUT 1"],
|
||||
pod_name,
|
||||
namespace,
|
||||
)
|
||||
for _ in output_rules:
|
||||
# always deleting the first rule since has been inserted from the top
|
||||
kubecli.exec_cmd_in_pod(
|
||||
[f"nsenter --target {pid} --net -- iptables -D OUTPUT 1"],
|
||||
pod_name,
|
||||
namespace,
|
||||
)
|
||||
|
||||
|
||||
def get_default_interface(
|
||||
pod_name: str, namespace: str, kubecli: KrknKubernetes
|
||||
) -> str:
|
||||
cmd = "ip r | grep default | awk '/default/ {print $5}'"
|
||||
output = kubecli.exec_cmd_in_pod([cmd], pod_name, namespace)
|
||||
return output.replace("\n", "")
|
||||
@@ -1,14 +1,25 @@
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import NetworkFilterConfig
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import AbstractNetworkChaosModule
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.node_network_filter import NodeNetworkFilterModule
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
|
||||
AbstractNetworkChaosModule,
|
||||
)
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.node_network_filter import (
|
||||
NodeNetworkFilterModule,
|
||||
)
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.pod_network_filter import (
|
||||
PodNetworkFilterModule,
|
||||
)
|
||||
|
||||
supported_modules = ["node_network_filter", "pod_network_filter"]
|
||||
|
||||
supported_modules = ["node_network_filter"]
|
||||
|
||||
class NetworkChaosFactory:
|
||||
|
||||
@staticmethod
|
||||
def get_instance(config: dict[str, str]) -> AbstractNetworkChaosModule:
|
||||
def get_instance(
|
||||
config: dict[str, str], kubecli: KrknTelemetryOpenshift
|
||||
) -> AbstractNetworkChaosModule:
|
||||
if config["id"] is None:
|
||||
raise Exception("network chaos id cannot be None")
|
||||
if config["id"] not in supported_modules:
|
||||
@@ -19,6 +30,10 @@ class NetworkChaosFactory:
|
||||
errors = config.validate()
|
||||
if len(errors) > 0:
|
||||
raise Exception(f"config validation errors: [{';'.join(errors)}]")
|
||||
return NodeNetworkFilterModule(config)
|
||||
|
||||
|
||||
return NodeNetworkFilterModule(config, kubecli)
|
||||
if config["id"] == "pod_network_filter":
|
||||
config = NetworkFilterConfig(**config)
|
||||
errors = config.validate()
|
||||
if len(errors) > 0:
|
||||
raise Exception(f"config validation errors: [{';'.join(errors)}]")
|
||||
return PodNetworkFilterModule(config, kubecli)
|
||||
|
||||
@@ -9,10 +9,6 @@ from krkn_lib.models.telemetry import ScenarioTelemetry
|
||||
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
|
||||
|
||||
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
|
||||
from krkn.scenario_plugins.network_chaos_ng.models import (
|
||||
NetworkChaosScenarioType,
|
||||
BaseNetworkChaosConfig,
|
||||
)
|
||||
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
|
||||
AbstractNetworkChaosModule,
|
||||
)
|
||||
@@ -39,56 +35,52 @@ class NetworkChaosNgScenarioPlugin(AbstractScenarioPlugin):
|
||||
)
|
||||
return 1
|
||||
for config in scenario_config:
|
||||
network_chaos = NetworkChaosFactory.get_instance(config)
|
||||
network_chaos_config = network_chaos.get_config()
|
||||
logging.info(
|
||||
f"running network_chaos scenario: {network_chaos_config[1].id}"
|
||||
network_chaos = NetworkChaosFactory.get_instance(
|
||||
config, lib_telemetry
|
||||
)
|
||||
if network_chaos_config[0] == NetworkChaosScenarioType.Node:
|
||||
targets = lib_telemetry.get_lib_kubernetes().list_nodes(
|
||||
network_chaos_config[1].label_selector
|
||||
)
|
||||
else:
|
||||
targets = lib_telemetry.get_lib_kubernetes().list_pods(
|
||||
network_chaos_config[1].namespace,
|
||||
network_chaos_config[1].label_selector,
|
||||
)
|
||||
network_chaos_type, network_chaos_config = (
|
||||
network_chaos.get_config()
|
||||
)
|
||||
logging.info(
|
||||
f"running network_chaos scenario: {network_chaos_config.id}"
|
||||
)
|
||||
targets = network_chaos.get_targets()
|
||||
if len(targets) == 0:
|
||||
logging.warning(
|
||||
f"no targets found for {network_chaos_config[1].id} "
|
||||
f"network chaos scenario with selector {network_chaos_config[1].label_selector} "
|
||||
f"with target type {network_chaos_config[0]}"
|
||||
f"no targets found for {network_chaos_config.id} "
|
||||
f"network chaos scenario with selector {network_chaos_config.label_selector} "
|
||||
f"with target type {network_chaos_type}"
|
||||
)
|
||||
|
||||
if network_chaos_config[1].instance_count != 0 and network_chaos_config[1].instance_count > len(targets):
|
||||
targets = random.sample(targets, network_chaos_config[1].instance_count)
|
||||
if (
|
||||
network_chaos_config.instance_count != 0
|
||||
and network_chaos_config.instance_count > len(targets)
|
||||
):
|
||||
targets = random.sample(
|
||||
targets, network_chaos_config.instance_count
|
||||
)
|
||||
|
||||
if network_chaos_config[1].execution == "parallel":
|
||||
self.run_parallel(targets, network_chaos, lib_telemetry)
|
||||
if network_chaos_config.execution == "parallel":
|
||||
self.run_parallel(targets, network_chaos)
|
||||
else:
|
||||
self.run_serial(targets, network_chaos, lib_telemetry)
|
||||
self.run_serial(targets, network_chaos)
|
||||
if len(config) > 1:
|
||||
logging.info(f"waiting {network_chaos_config[1].wait_duration} seconds before running the next "
|
||||
f"Network Chaos NG Module")
|
||||
time.sleep(network_chaos_config[1].wait_duration)
|
||||
logging.info(
|
||||
f"waiting {network_chaos_config.wait_duration} seconds before running the next "
|
||||
f"Network Chaos NG Module"
|
||||
)
|
||||
time.sleep(network_chaos_config.wait_duration)
|
||||
except Exception as e:
|
||||
logging.error(str(e))
|
||||
return 1
|
||||
return 0
|
||||
|
||||
def run_parallel(
|
||||
self,
|
||||
targets: list[str],
|
||||
module: AbstractNetworkChaosModule,
|
||||
lib_telemetry: KrknTelemetryOpenshift,
|
||||
):
|
||||
def run_parallel(self, targets: list[str], module: AbstractNetworkChaosModule):
|
||||
error_queue = queue.Queue()
|
||||
threads = []
|
||||
errors = []
|
||||
for target in targets:
|
||||
thread = threading.Thread(
|
||||
target=module.run, args=[target, lib_telemetry, error_queue]
|
||||
)
|
||||
thread = threading.Thread(target=module.run, args=[target, error_queue])
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
for thread in threads:
|
||||
@@ -103,14 +95,9 @@ class NetworkChaosNgScenarioPlugin(AbstractScenarioPlugin):
|
||||
f"module {module.get_config()[1].id} execution failed: [{';'.join(errors)}]"
|
||||
)
|
||||
|
||||
def run_serial(
|
||||
self,
|
||||
targets: list[str],
|
||||
module: AbstractNetworkChaosModule,
|
||||
lib_telemetry: KrknTelemetryOpenshift,
|
||||
):
|
||||
def run_serial(self, targets: list[str], module: AbstractNetworkChaosModule):
|
||||
for target in targets:
|
||||
module.run(target, lib_telemetry)
|
||||
module.run(target)
|
||||
|
||||
def get_scenario_types(self) -> list[str]:
|
||||
return ["network_chaos_ng_scenarios"]
|
||||
|
||||
@@ -16,7 +16,7 @@ google-cloud-compute==1.22.0
|
||||
ibm_cloud_sdk_core==3.18.0
|
||||
ibm_vpc==0.20.0
|
||||
jinja2==3.1.6
|
||||
krkn-lib==5.0.2
|
||||
|
||||
lxml==5.1.0
|
||||
kubernetes==28.1.0
|
||||
numpy==1.26.4
|
||||
@@ -36,7 +36,7 @@ werkzeug==3.0.6
|
||||
wheel==0.42.0
|
||||
zope.interface==5.4.0
|
||||
|
||||
|
||||
git+https://github.com/krkn-chaos/krkn-lib.git@dns_disruption
|
||||
git+https://github.com/krkn-chaos/arcaflow-plugin-kill-pod.git@v0.1.0
|
||||
git+https://github.com/vmware/vsphere-automation-sdk-python.git@v8.0.0.0
|
||||
cryptography>=42.0.4 # not directly required, pinned by Snyk to avoid a vulnerability
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
execution: parallel
|
||||
ingress: false
|
||||
egress: true
|
||||
target: node
|
||||
target: ''
|
||||
interfaces: []
|
||||
ports:
|
||||
- 2049
|
||||
- 53
|
||||
14
scenarios/kube/pod-network-filter.yml
Normal file
14
scenarios/kube/pod-network-filter.yml
Normal file
@@ -0,0 +1,14 @@
|
||||
- id: pod_network_filter
|
||||
image: "quay.io/krkn-chaos/krkn-network-chaos:latest"
|
||||
wait_duration: 1
|
||||
test_duration: 60ipt
|
||||
label_selector: ""
|
||||
namespace: 'default'
|
||||
instance_count: 1
|
||||
execution: parallel
|
||||
ingress: false
|
||||
egress: true
|
||||
target: "network-victim"
|
||||
interfaces: []
|
||||
ports:
|
||||
- 80
|
||||
Reference in New Issue
Block a user