Compare commits

...

7 Commits

Author SHA1 Message Date
yogananth subramanian
dc8d7ad75b Add disk detach/attach scenario to baremetal node actions (#855)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Has been cancelled
Functional & Unit Tests / Generate Coverage Badge (push) Has been cancelled
- Implemented methods for detaching and attaching disks to baremetal nodes.
- Added a new scenario `node_disk_detach_attach_scenario` to manage disk operations.
- Updated the YAML configuration to include the new scenario with disk details.

Signed-off-by: Yogananth Subramanian <ysubrama@redhat.com>
2025-07-03 17:18:57 +02:00
Paige Patton
1cc44e1f18 adding non native verison of pod scenarios (#847)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-03 15:46:13 +02:00
Paige Patton
c8190fd1c1 adding pod test (#858)
Signed-off-by: Paige Patton <prubenda@redhat.com>
2025-07-03 15:00:51 +02:00
Tullio Sebastiani
9078b35e46 updated krkn-lib
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-07-02 17:30:58 +02:00
Tullio Sebastiani
e6b1665aa1 added toleration to schedule pod on master
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-06-30 10:30:47 +02:00
Tullio Sebastiani
c56819365c minor nits fixes
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-06-27 15:12:45 +02:00
Tullio Sebastiani
6a657576cb api refactoring + pod network filter scenario
Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2025-06-26 15:51:35 +02:00
26 changed files with 870 additions and 237 deletions

View File

@@ -82,6 +82,7 @@ jobs:
echo "test_service_hijacking" > ./CI/tests/functional_tests
echo "test_app_outages" >> ./CI/tests/functional_tests
echo "test_container" >> ./CI/tests/functional_tests
echo "test_pod" >> ./CI/tests/functional_tests
echo "test_namespace" >> ./CI/tests/functional_tests
echo "test_net_chaos" >> ./CI/tests/functional_tests
echo "test_time" >> ./CI/tests/functional_tests
@@ -111,6 +112,7 @@ jobs:
echo "test_service_hijacking" >> ./CI/tests/functional_tests
echo "test_app_outages" >> ./CI/tests/functional_tests
echo "test_container" >> ./CI/tests/functional_tests
echo "test_pod" >> ./CI/tests/functional_tests
echo "test_namespace" >> ./CI/tests/functional_tests
echo "test_net_chaos" >> ./CI/tests/functional_tests
echo "test_time" >> ./CI/tests/functional_tests

View File

@@ -2,12 +2,9 @@ kraken:
distribution: kubernetes # Distribution can be kubernetes or openshift.
kubeconfig_path: ~/.kube/config # Path to kubeconfig.
exit_on_failure: False # Exit when a post action scenario fails.
litmus_version: v1.13.6 # Litmus version to install.
litmus_uninstall: False # If you want to uninstall litmus if failure.
chaos_scenarios: # List of policies/chaos scenarios to load.
- $scenario_type: # List of chaos pod scenarios to load.
- $scenario_file
$post_config
cerberus:
cerberus_enabled: False # Enable it when cerberus is previously installed.
cerberus_url: # When cerberus_enabled is set to True, provide the url where cerberus publishes go/no-go signal.
@@ -52,8 +49,6 @@ telemetry:
telemetry_group: "funtests"
elastic:
enable_elastic: False
collect_metrics: False
collect_alerts: False
verify_certs: False
elastic_url: "https://192.168.39.196" # To track results in elasticsearch, give url to server here; will post telemetry details when url and index not blank
elastic_port: 32766

18
CI/tests/test_pod.sh Executable file
View File

@@ -0,0 +1,18 @@
set -xeEo pipefail
source CI/tests/common.sh
trap error ERR
trap finish EXIT
function functional_test_pod_crash {
export scenario_type="pod_disruption_scenarios"
export scenario_file="scenarios/kind/pod_etcd.yml"
export post_config=""
envsubst < CI/config/common_test_config.yaml > CI/config/pod_config.yaml
python3 -m coverage run -a run_kraken.py -c CI/config/pod_config.yaml
echo "Pod disruption scenario test: Success"
}
functional_test_pod_crash

View File

@@ -117,3 +117,5 @@ class AbstractScenarioPlugin(ABC):
logging.info(f"wating {wait_duration} before running the next scenario")
time.sleep(wait_duration)
return failed_scenarios, scenario_telemetries

View File

@@ -9,7 +9,7 @@ from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_yaml_item_value
from krkn import cerberus
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
@@ -44,7 +44,6 @@ class ContainerScenarioPlugin(AbstractScenarioPlugin):
return 1
scenario_telemetry.affected_pods = result
# publish cerberus status
except (RuntimeError, Exception):
logging.error("ContainerScenarioPlugin exiting due to Exception %s")
return 1
@@ -63,6 +62,7 @@ class ContainerScenarioPlugin(AbstractScenarioPlugin):
namespace_pattern=namespace_pattern,
label_selector=label_selector,
max_timeout=recovery_time,
field_selector="status.phase=Running"
)
def container_killing_in_pod(self, cont_scenario, kubecli: KrknKubernetes):

View File

@@ -17,26 +17,14 @@ class NativeScenarioPlugin(AbstractScenarioPlugin):
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
) -> int:
pool = PodsMonitorPool(lib_telemetry.get_lib_kubernetes())
kill_scenarios = [
kill_scenario
for kill_scenario in PLUGINS.unserialize_scenario(scenario)
if kill_scenario["id"] == "kill-pods"
]
try:
self.start_monitoring(pool, kill_scenarios)
PLUGINS.run(
scenario,
lib_telemetry.get_lib_kubernetes().get_kubeconfig_path(),
krkn_config,
run_uuid,
)
result = pool.join()
scenario_telemetry.affected_pods = result
if result.error:
logging.error(f"NativeScenarioPlugin unrecovered pods: {result.error}")
return 1
except Exception as e:
logging.error("NativeScenarioPlugin exiting due to Exception %s" % e)
@@ -47,46 +35,6 @@ class NativeScenarioPlugin(AbstractScenarioPlugin):
def get_scenario_types(self) -> list[str]:
return [
"pod_disruption_scenarios",
"pod_network_scenarios",
"ingress_node_scenarios"
]
def start_monitoring(self, pool: PodsMonitorPool, scenarios: list[Any]):
for kill_scenario in scenarios:
recovery_time = kill_scenario["config"]["krkn_pod_recovery_time"]
if (
"namespace_pattern" in kill_scenario["config"]
and "label_selector" in kill_scenario["config"]
):
namespace_pattern = kill_scenario["config"]["namespace_pattern"]
label_selector = kill_scenario["config"]["label_selector"]
pool.select_and_monitor_by_namespace_pattern_and_label(
namespace_pattern=namespace_pattern,
label_selector=label_selector,
max_timeout=recovery_time,
)
logging.info(
f"waiting {recovery_time} seconds for pod recovery, "
f"pod label selector: {label_selector} namespace pattern: {namespace_pattern}"
)
elif (
"namespace_pattern" in kill_scenario["config"]
and "name_pattern" in kill_scenario["config"]
):
namespace_pattern = kill_scenario["config"]["namespace_pattern"]
name_pattern = kill_scenario["config"]["name_pattern"]
pool.select_and_monitor_by_name_pattern_and_namespace_pattern(
pod_name_pattern=name_pattern,
namespace_pattern=namespace_pattern,
max_timeout=recovery_time,
)
logging.info(
f"waiting {recovery_time} seconds for pod recovery, "
f"pod name pattern: {name_pattern} namespace pattern: {namespace_pattern}"
)
else:
raise Exception(
f"impossible to determine monitor parameters, check {kill_scenario} configuration"
)

View File

@@ -4,7 +4,6 @@ import logging
from os.path import abspath
from typing import List, Any, Dict
from krkn.scenario_plugins.native.run_python_plugin import run_python_file
from arcaflow_plugin_kill_pod import kill_pods, wait_for_pods
from krkn.scenario_plugins.native.network.ingress_shaping import network_chaos
from krkn.scenario_plugins.native.pod_network_outage.pod_network_outage_plugin import (
pod_outage,
@@ -148,13 +147,6 @@ class Plugins:
PLUGINS = Plugins(
[
PluginStep(
kill_pods,
[
"error",
],
),
PluginStep(wait_for_pods, ["error"]),
PluginStep(run_python_file, ["error"]),
PluginStep(network_chaos, ["error"]),
PluginStep(pod_outage, ["error"]),

View File

@@ -28,8 +28,12 @@ class BaseNetworkChaosConfig:
errors.append(
f"{self.execution} is not in supported execution mod: {','.join(self.supported_execution)}"
)
if self.label_selector is None:
if self.id == "node_network_filter" and self.label_selector is None:
errors.append("label_selector cannot be None")
if not isinstance(self.wait_duration, int):
errors.append("wait_duration must be an int")
if not isinstance(self.test_duration, int):
errors.append("test_duration must be an int")
return errors
@@ -41,8 +45,14 @@ class NetworkFilterConfig(BaseNetworkChaosConfig):
target: str
ports: list[int]
image: str
protocols: list[str]
def validate(self) -> list[str]:
errors = super().validate()
# here further validations
allowed_protocols = {"tcp", "udp"}
if not set(self.protocols).issubset(allowed_protocols):
errors.append(
f"{self.protocols} contains not allowed protocols only tcp and udp is allowed"
)
return errors

View File

@@ -3,19 +3,25 @@ import logging
import queue
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.network_chaos_ng.models import BaseNetworkChaosConfig, NetworkChaosScenarioType
from krkn.scenario_plugins.network_chaos_ng.models import (
BaseNetworkChaosConfig,
NetworkChaosScenarioType,
)
class AbstractNetworkChaosModule(abc.ABC):
"""
The abstract class that needs to be implemented by each Network Chaos Scenario
"""
kubecli: KrknTelemetryOpenshift
base_network_config: BaseNetworkChaosConfig
@abc.abstractmethod
def run(self, target: str, kubecli: KrknTelemetryOpenshift, error_queue: queue.Queue = None):
def run(self, target: str, error_queue: queue.Queue = None):
"""
the entrypoint method for the Network Chaos Scenario
:param target: The resource name that will be targeted by the scenario (Node Name, Pod Name etc.)
:param kubecli: The `KrknTelemetryOpenshift` needed by the scenario to access to the krkn-lib methods
:param error_queue: A queue that will be used by the plugin to push the errors raised during the execution of parallel modules
"""
pass
@@ -28,31 +34,17 @@ class AbstractNetworkChaosModule(abc.ABC):
"""
pass
def get_targets(self) -> list[str]:
"""
checks and returns the targets based on the common scenario configuration
"""
def log_info(self, message: str, parallel: bool = False, node_name: str = ""):
"""
log helper method for INFO severity to be used in the scenarios
"""
if parallel:
logging.info(f"[{node_name}]: {message}")
else:
logging.info(message)
pass
def log_warning(self, message: str, parallel: bool = False, node_name: str = ""):
"""
log helper method for WARNING severity to be used in the scenarios
"""
if parallel:
logging.warning(f"[{node_name}]: {message}")
else:
logging.warning(message)
def log_error(self, message: str, parallel: bool = False, node_name: str = ""):
"""
log helper method for ERROR severity to be used in the scenarios
"""
if parallel:
logging.error(f"[{node_name}]: {message}")
else:
logging.error(message)
def __init__(
self,
base_network_config: BaseNetworkChaosConfig,
kubecli: KrknTelemetryOpenshift,
):
self.kubecli = kubecli
self.base_network_config = base_network_config

View File

@@ -1,11 +1,6 @@
import os
import queue
import time
import yaml
from jinja2 import Environment, FileSystemLoader
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_random_string
from krkn.scenario_plugins.network_chaos_ng.models import (
@@ -16,88 +11,92 @@ from krkn.scenario_plugins.network_chaos_ng.models import (
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_filter import (
deploy_network_filter_pod,
apply_network_rules,
clean_network_rules,
generate_rules,
get_default_interface,
)
class NodeNetworkFilterModule(AbstractNetworkChaosModule):
config: NetworkFilterConfig
kubecli: KrknTelemetryOpenshift
def run(
self,
target: str,
kubecli: KrknTelemetryOpenshift,
error_queue: queue.Queue = None,
):
def run(self, target: str, error_queue: queue.Queue = None):
parallel = False
if error_queue:
parallel = True
try:
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
env = Environment(loader=file_loader, autoescape=True)
pod_name = f"node-filter-{get_random_string(5)}"
pod_template = env.get_template("templates/network-chaos.j2")
pod_body = yaml.safe_load(
pod_template.render(
pod_name=pod_name,
namespace=self.config.namespace,
host_network=True,
target=target,
workload_image=self.config.image,
)
)
self.log_info(
f"creating pod to filter "
log_info(
f"creating workload to filter node {target} network"
f"ports {','.join([str(port) for port in self.config.ports])}, "
f"ingress:{str(self.config.ingress)}, "
f"egress:{str(self.config.egress)}",
parallel,
target,
)
kubecli.get_lib_kubernetes().create_pod(
pod_body, self.config.namespace, 300
pod_name = f"node-filter-{get_random_string(5)}"
deploy_network_filter_pod(
self.config,
target,
pod_name,
self.kubecli.get_lib_kubernetes(),
)
if len(self.config.interfaces) == 0:
interfaces = [
self.get_default_interface(pod_name, self.config.namespace, kubecli)
get_default_interface(
pod_name,
self.config.namespace,
self.kubecli.get_lib_kubernetes(),
)
]
self.log_info(f"detected default interface {interfaces[0]}")
log_info(
f"detected default interface {interfaces[0]}", parallel, target
)
else:
interfaces = self.config.interfaces
input_rules, output_rules = self.generate_rules(interfaces)
input_rules, output_rules = generate_rules(interfaces, self.config)
for rule in input_rules:
self.log_info(f"applying iptables INPUT rule: {rule}", parallel, target)
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
[rule], pod_name, self.config.namespace
)
for rule in output_rules:
self.log_info(
f"applying iptables OUTPUT rule: {rule}", parallel, target
)
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
[rule], pod_name, self.config.namespace
)
self.log_info(
f"waiting {self.config.test_duration} seconds before removing the iptables rules"
apply_network_rules(
self.kubecli.get_lib_kubernetes(),
input_rules,
output_rules,
pod_name,
self.config.namespace,
parallel,
target,
)
log_info(
f"waiting {self.config.test_duration} seconds before removing the iptables rules",
parallel,
target,
)
time.sleep(self.config.test_duration)
self.log_info("removing iptables rules")
for _ in input_rules:
# always deleting the first rule since has been inserted from the top
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
[f"iptables -D INPUT 1"], pod_name, self.config.namespace
)
for _ in output_rules:
# always deleting the first rule since has been inserted from the top
kubecli.get_lib_kubernetes().exec_cmd_in_pod(
[f"iptables -D OUTPUT 1"], pod_name, self.config.namespace
)
self.log_info(
f"deleting network chaos pod {pod_name} from {self.config.namespace}"
log_info("removing iptables rules", parallel, target)
clean_network_rules(
self.kubecli.get_lib_kubernetes(),
input_rules,
output_rules,
pod_name,
self.config.namespace,
)
kubecli.get_lib_kubernetes().delete_pod(pod_name, self.config.namespace)
self.kubecli.get_lib_kubernetes().delete_pod(
pod_name, self.config.namespace
)
except Exception as e:
if error_queue is None:
@@ -105,33 +104,25 @@ class NodeNetworkFilterModule(AbstractNetworkChaosModule):
else:
error_queue.put(str(e))
def __init__(self, config: NetworkFilterConfig):
def __init__(self, config: NetworkFilterConfig, kubecli: KrknTelemetryOpenshift):
super().__init__(config, kubecli)
self.config = config
def get_config(self) -> (NetworkChaosScenarioType, BaseNetworkChaosConfig):
return NetworkChaosScenarioType.Node, self.config
def get_default_interface(
self, pod_name: str, namespace: str, kubecli: KrknTelemetryOpenshift
) -> str:
cmd = "ip r | grep default | awk '/default/ {print $5}'"
output = kubecli.get_lib_kubernetes().exec_cmd_in_pod(
[cmd], pod_name, namespace
)
return output.replace("\n", "")
def get_targets(self) -> list[str]:
if self.base_network_config.label_selector:
return self.kubecli.get_lib_kubernetes().list_nodes(
self.base_network_config.label_selector
)
else:
if not self.config.target:
raise Exception(
"neither node selector nor node_name (target) specified, aborting."
)
node_info = self.kubecli.get_lib_kubernetes().list_nodes()
if self.config.target not in node_info:
raise Exception(f"node {self.config.target} not found, aborting")
def generate_rules(self, interfaces: list[str]) -> (list[str], list[str]):
input_rules = []
output_rules = []
for interface in interfaces:
for port in self.config.ports:
if self.config.egress:
output_rules.append(
f"iptables -I OUTPUT 1 -p tcp --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
)
if self.config.ingress:
input_rules.append(
f"iptables -I INPUT 1 -i {interface} -p tcp --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
)
return input_rules, output_rules
return [self.config.target]

View File

@@ -0,0 +1,177 @@
import queue
import time
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_random_string
from krkn.scenario_plugins.network_chaos_ng.models import (
NetworkChaosScenarioType,
BaseNetworkChaosConfig,
NetworkFilterConfig,
)
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
from krkn.scenario_plugins.network_chaos_ng.modules.utils_network_filter import (
deploy_network_filter_pod,
get_default_interface,
generate_namespaced_rules,
apply_network_rules,
clean_network_rules_namespaced,
)
class PodNetworkFilterModule(AbstractNetworkChaosModule):
config: NetworkFilterConfig
def run(self, target: str, error_queue: queue.Queue = None):
parallel = False
if error_queue:
parallel = True
try:
pod_name = f"pod-filter-{get_random_string(5)}"
container_name = f"fedora-container-{get_random_string(5)}"
pod_info = self.kubecli.get_lib_kubernetes().get_pod_info(
target, self.config.namespace
)
log_info(
f"creating workload to filter pod {self.config.target} network"
f"ports {','.join([str(port) for port in self.config.ports])}, "
f"ingress:{str(self.config.ingress)}, "
f"egress:{str(self.config.egress)}",
parallel,
pod_name,
)
if not pod_info:
raise Exception(
f"impossible to retrieve infos for pod {self.config.target} namespace {self.config.namespace}"
)
deploy_network_filter_pod(
self.config,
pod_info.nodeName,
pod_name,
self.kubecli.get_lib_kubernetes(),
container_name,
)
if len(self.config.interfaces) == 0:
interfaces = [
get_default_interface(
pod_name,
self.config.namespace,
self.kubecli.get_lib_kubernetes(),
)
]
log_info(
f"detected default interface {interfaces[0]}",
parallel,
pod_name,
)
else:
interfaces = self.config.interfaces
container_ids = self.kubecli.get_lib_kubernetes().get_container_ids(
target, self.config.namespace
)
if len(container_ids) == 0:
raise Exception(
f"impossible to resolve container id for pod {target} namespace {self.config.namespace}"
)
log_info(f"targeting container {container_ids[0]}", parallel, pod_name)
pids = self.kubecli.get_lib_kubernetes().get_pod_pids(
base_pod_name=pod_name,
base_pod_namespace=self.config.namespace,
base_pod_container_name=container_name,
pod_name=target,
pod_namespace=self.config.namespace,
pod_container_id=container_ids[0],
)
if not pids:
raise Exception(f"impossible to resolve pid for pod {target}")
log_info(
f"resolved pids {pids} in node {pod_info.nodeName} for pod {target}",
parallel,
pod_name,
)
input_rules, output_rules = generate_namespaced_rules(
interfaces, self.config, pids
)
apply_network_rules(
self.kubecli.get_lib_kubernetes(),
input_rules,
output_rules,
pod_name,
self.config.namespace,
parallel,
target,
)
log_info(
f"waiting {self.config.test_duration} seconds before removing the iptables rules",
parallel,
pod_name,
)
time.sleep(self.config.test_duration)
log_info("removing iptables rules", parallel, pod_name)
clean_network_rules_namespaced(
self.kubecli.get_lib_kubernetes(),
input_rules,
output_rules,
pod_name,
self.config.namespace,
pids,
)
self.kubecli.get_lib_kubernetes().delete_pod(
pod_name, self.config.namespace
)
except Exception as e:
if error_queue is None:
raise e
else:
error_queue.put(str(e))
def __init__(self, config: NetworkFilterConfig, kubecli: KrknTelemetryOpenshift):
super().__init__(config, kubecli)
self.config = config
def get_config(self) -> (NetworkChaosScenarioType, BaseNetworkChaosConfig):
return NetworkChaosScenarioType.Pod, self.config
def get_targets(self) -> list[str]:
if not self.config.namespace:
raise Exception("namespace not specified, aborting")
if self.base_network_config.label_selector:
return self.kubecli.get_lib_kubernetes().list_pods(
self.config.namespace, self.config.label_selector
)
else:
if not self.config.target:
raise Exception(
"neither node selector nor node_name (target) specified, aborting."
)
if not self.kubecli.get_lib_kubernetes().check_if_pod_exists(
self.config.target, self.config.namespace
):
raise Exception(
f"pod {self.config.target} not found in namespace {self.config.namespace}"
)
return [self.config.target]

View File

@@ -7,10 +7,15 @@ spec:
{% if host_network %}
hostNetwork: true
{%endif%}
hostPID: true
nodeSelector:
kubernetes.io/hostname: {{target}}
tolerations:
- key: "node-role.kubernetes.io/master"
operator: "Exists"
effect: "NoSchedule"
containers:
- name: fedora
- name: {{container_name}}
imagePullPolicy: Always
image: {{workload_image}}
securityContext:

View File

@@ -0,0 +1,31 @@
import logging
def log_info(message: str, parallel: bool = False, node_name: str = ""):
"""
log helper method for INFO severity to be used in the scenarios
"""
if parallel:
logging.info(f"[{node_name}]: {message}")
else:
logging.info(message)
def log_error(self, message: str, parallel: bool = False, node_name: str = ""):
"""
log helper method for ERROR severity to be used in the scenarios
"""
if parallel:
logging.error(f"[{node_name}]: {message}")
else:
logging.error(message)
def log_warning(self, message: str, parallel: bool = False, node_name: str = ""):
"""
log helper method for WARNING severity to be used in the scenarios
"""
if parallel:
logging.warning(f"[{node_name}]: {message}")
else:
logging.warning(message)

View File

@@ -0,0 +1,138 @@
import os
import yaml
from jinja2 import FileSystemLoader, Environment
from krkn_lib.k8s import KrknKubernetes
from krkn.scenario_plugins.network_chaos_ng.models import NetworkFilterConfig
from krkn.scenario_plugins.network_chaos_ng.modules.utils import log_info
def generate_rules(
interfaces: list[str], config: NetworkFilterConfig
) -> (list[str], list[str]):
input_rules = []
output_rules = []
for interface in interfaces:
for port in config.ports:
if config.egress:
for protocol in set(config.protocols):
output_rules.append(
f"iptables -I OUTPUT 1 -p {protocol} --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
)
if config.ingress:
for protocol in set(config.protocols):
input_rules.append(
f"iptables -I INPUT 1 -i {interface} -p {protocol} --dport {port} -m state --state NEW,RELATED,ESTABLISHED -j DROP"
)
return input_rules, output_rules
def generate_namespaced_rules(
interfaces: list[str], config: NetworkFilterConfig, pids: list[str]
) -> (list[str], list[str]):
namespaced_input_rules: list[str] = []
namespaced_output_rules: list[str] = []
input_rules, output_rules = generate_rules(interfaces, config)
for pid in pids:
ns_input_rules = [
f"nsenter --target {pid} --net -- {rule}" for rule in input_rules
]
ns_output_rules = [
f"nsenter --target {pid} --net -- {rule}" for rule in output_rules
]
namespaced_input_rules.extend(ns_input_rules)
namespaced_output_rules.extend(ns_output_rules)
return namespaced_input_rules, namespaced_output_rules
def deploy_network_filter_pod(
config: NetworkFilterConfig,
target_node: str,
pod_name: str,
kubecli: KrknKubernetes,
container_name: str = "fedora",
):
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
env = Environment(loader=file_loader, autoescape=True)
pod_template = env.get_template("templates/network-chaos.j2")
pod_body = yaml.safe_load(
pod_template.render(
pod_name=pod_name,
namespace=config.namespace,
host_network=True,
target=target_node,
container_name=container_name,
workload_image=config.image,
)
)
kubecli.create_pod(pod_body, config.namespace, 300)
def apply_network_rules(
kubecli: KrknKubernetes,
input_rules: list[str],
output_rules: list[str],
pod_name: str,
namespace: str,
parallel: bool,
node_name: str,
):
for rule in input_rules:
log_info(f"applying iptables INPUT rule: {rule}", parallel, node_name)
kubecli.exec_cmd_in_pod([rule], pod_name, namespace)
for rule in output_rules:
log_info(f"applying iptables OUTPUT rule: {rule}", parallel, node_name)
kubecli.exec_cmd_in_pod([rule], pod_name, namespace)
def clean_network_rules(
kubecli: KrknKubernetes,
input_rules: list[str],
output_rules: list[str],
pod_name: str,
namespace: str,
):
for _ in input_rules:
# always deleting the first rule since has been inserted from the top
kubecli.exec_cmd_in_pod([f"iptables -D INPUT 1"], pod_name, namespace)
for _ in output_rules:
# always deleting the first rule since has been inserted from the top
kubecli.exec_cmd_in_pod([f"iptables -D OUTPUT 1"], pod_name, namespace)
def clean_network_rules_namespaced(
kubecli: KrknKubernetes,
input_rules: list[str],
output_rules: list[str],
pod_name: str,
namespace: str,
pids: list[str],
):
for _ in input_rules:
for pid in pids:
# always deleting the first rule since has been inserted from the top
kubecli.exec_cmd_in_pod(
[f"nsenter --target {pid} --net -- iptables -D INPUT 1"],
pod_name,
namespace,
)
for _ in output_rules:
for pid in pids:
# always deleting the first rule since has been inserted from the top
kubecli.exec_cmd_in_pod(
[f"nsenter --target {pid} --net -- iptables -D OUTPUT 1"],
pod_name,
namespace,
)
def get_default_interface(
pod_name: str, namespace: str, kubecli: KrknKubernetes
) -> str:
cmd = "ip r | grep default | awk '/default/ {print $5}'"
output = kubecli.exec_cmd_in_pod([cmd], pod_name, namespace)
return output.replace("\n", "")

View File

@@ -1,14 +1,25 @@
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.network_chaos_ng.models import NetworkFilterConfig
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import AbstractNetworkChaosModule
from krkn.scenario_plugins.network_chaos_ng.modules.node_network_filter import NodeNetworkFilterModule
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.node_network_filter import (
NodeNetworkFilterModule,
)
from krkn.scenario_plugins.network_chaos_ng.modules.pod_network_filter import (
PodNetworkFilterModule,
)
supported_modules = ["node_network_filter", "pod_network_filter"]
supported_modules = ["node_network_filter"]
class NetworkChaosFactory:
@staticmethod
def get_instance(config: dict[str, str]) -> AbstractNetworkChaosModule:
def get_instance(
config: dict[str, str], kubecli: KrknTelemetryOpenshift
) -> AbstractNetworkChaosModule:
if config["id"] is None:
raise Exception("network chaos id cannot be None")
if config["id"] not in supported_modules:
@@ -19,6 +30,10 @@ class NetworkChaosFactory:
errors = config.validate()
if len(errors) > 0:
raise Exception(f"config validation errors: [{';'.join(errors)}]")
return NodeNetworkFilterModule(config)
return NodeNetworkFilterModule(config, kubecli)
if config["id"] == "pod_network_filter":
config = NetworkFilterConfig(**config)
errors = config.validate()
if len(errors) > 0:
raise Exception(f"config validation errors: [{';'.join(errors)}]")
return PodNetworkFilterModule(config, kubecli)

View File

@@ -9,10 +9,6 @@ from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn.scenario_plugins.network_chaos_ng.models import (
NetworkChaosScenarioType,
BaseNetworkChaosConfig,
)
from krkn.scenario_plugins.network_chaos_ng.modules.abstract_network_chaos_module import (
AbstractNetworkChaosModule,
)
@@ -39,56 +35,52 @@ class NetworkChaosNgScenarioPlugin(AbstractScenarioPlugin):
)
return 1
for config in scenario_config:
network_chaos = NetworkChaosFactory.get_instance(config)
network_chaos_config = network_chaos.get_config()
logging.info(
f"running network_chaos scenario: {network_chaos_config[1].id}"
network_chaos = NetworkChaosFactory.get_instance(
config, lib_telemetry
)
if network_chaos_config[0] == NetworkChaosScenarioType.Node:
targets = lib_telemetry.get_lib_kubernetes().list_nodes(
network_chaos_config[1].label_selector
)
else:
targets = lib_telemetry.get_lib_kubernetes().list_pods(
network_chaos_config[1].namespace,
network_chaos_config[1].label_selector,
)
network_chaos_type, network_chaos_config = (
network_chaos.get_config()
)
logging.info(
f"running network_chaos scenario: {network_chaos_config.id}"
)
targets = network_chaos.get_targets()
if len(targets) == 0:
logging.warning(
f"no targets found for {network_chaos_config[1].id} "
f"network chaos scenario with selector {network_chaos_config[1].label_selector} "
f"with target type {network_chaos_config[0]}"
f"no targets found for {network_chaos_config.id} "
f"network chaos scenario with selector {network_chaos_config.label_selector} "
f"with target type {network_chaos_type}"
)
if network_chaos_config[1].instance_count != 0 and network_chaos_config[1].instance_count > len(targets):
targets = random.sample(targets, network_chaos_config[1].instance_count)
if (
network_chaos_config.instance_count != 0
and network_chaos_config.instance_count > len(targets)
):
targets = random.sample(
targets, network_chaos_config.instance_count
)
if network_chaos_config[1].execution == "parallel":
self.run_parallel(targets, network_chaos, lib_telemetry)
if network_chaos_config.execution == "parallel":
self.run_parallel(targets, network_chaos)
else:
self.run_serial(targets, network_chaos, lib_telemetry)
self.run_serial(targets, network_chaos)
if len(config) > 1:
logging.info(f"waiting {network_chaos_config[1].wait_duration} seconds before running the next "
f"Network Chaos NG Module")
time.sleep(network_chaos_config[1].wait_duration)
logging.info(
f"waiting {network_chaos_config.wait_duration} seconds before running the next "
f"Network Chaos NG Module"
)
time.sleep(network_chaos_config.wait_duration)
except Exception as e:
logging.error(str(e))
return 1
return 0
def run_parallel(
self,
targets: list[str],
module: AbstractNetworkChaosModule,
lib_telemetry: KrknTelemetryOpenshift,
):
def run_parallel(self, targets: list[str], module: AbstractNetworkChaosModule):
error_queue = queue.Queue()
threads = []
errors = []
for target in targets:
thread = threading.Thread(
target=module.run, args=[target, lib_telemetry, error_queue]
)
thread = threading.Thread(target=module.run, args=[target, error_queue])
thread.start()
threads.append(thread)
for thread in threads:
@@ -103,14 +95,9 @@ class NetworkChaosNgScenarioPlugin(AbstractScenarioPlugin):
f"module {module.get_config()[1].id} execution failed: [{';'.join(errors)}]"
)
def run_serial(
self,
targets: list[str],
module: AbstractNetworkChaosModule,
lib_telemetry: KrknTelemetryOpenshift,
):
def run_serial(self, targets: list[str], module: AbstractNetworkChaosModule):
for target in targets:
module.run(target, lib_telemetry)
module.run(target)
def get_scenario_types(self) -> list[str]:
return ["network_chaos_ng_scenarios"]

View File

@@ -10,6 +10,7 @@ import time
import traceback
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.k8s import AffectedNode, AffectedNodeStatus
from krkn_lib.utils import get_random_string
class BM:
def __init__(self, bm_info, user, passwd):
@@ -21,6 +22,17 @@ class BM:
with oc.project("openshift-machine-api"):
return oc.selector("node/" + node_name).object()
def get_bm_disks(self, node_name):
if (
self.bm_info is not None
and node_name in self.bm_info
and "disks" in self.bm_info[node_name]
):
return self.bm_info[node_name]["disks"]
else:
return []
# Get the ipmi or other BMC address of the baremetal node
def get_bmc_addr(self, node_name):
# Addresses in the config get higher priority.
@@ -228,3 +240,104 @@ class bm_node_scenarios(abstract_node_scenarios):
logging.error("node_reboot_scenario injection failed!")
raise e
self.affected_nodes_status.affected_nodes.append(affected_node)
def node_disk_detach_attach_scenario(self, instance_kill_count, node, timeout, duration):
logging.info("Starting disk_detach_attach_scenario injection")
disk_attachment_details = self.get_disk_attachment_info(instance_kill_count, node)
if disk_attachment_details:
self.disk_detach_scenario(instance_kill_count, node, disk_attachment_details, timeout)
logging.info("Waiting for %s seconds before attaching the disk" % (duration))
time.sleep(duration)
self.disk_attach_scenario(instance_kill_count, node, disk_attachment_details)
logging.info("node_disk_detach_attach_scenario has been successfully injected!")
else:
logging.error("Node %s has only root disk attached" % (node))
logging.error("node_disk_detach_attach_scenario failed!")
# Get volume attachment info
def get_disk_attachment_info(self, instance_kill_count, node):
for _ in range(instance_kill_count):
try:
logging.info("Obtaining disk attachment information")
user_disks= self.bm.get_bm_disks(node)
disk_pod_name = f"disk-pod-{get_random_string(5)}"
cmd = '''bootdev=$(lsblk -no PKNAME $(findmnt -no SOURCE /boot));
for path in /sys/block/*/device/state; do
dev=$(basename $(dirname $(dirname "$path")));
[[ "$dev" != "$bootdev" ]] && echo "$dev";
done'''
pod_command = ["chroot /host /bin/sh -c '" + cmd + "'"]
disk_response = self.kubecli.exec_command_on_node(
node, pod_command, disk_pod_name, "default"
)
logging.info("Disk response: %s" % (disk_response))
node_disks = [disk for disk in disk_response.split("\n") if disk]
logging.info("Node disks: %s" % (node_disks))
offline_disks = [disk for disk in node_disks if disk not in user_disks]
return offline_disks if offline_disks else node_disks
except Exception as e:
logging.error(
"Failed to obtain disk attachment information of %s node. "
"Encounteres following exception: %s." % (node, e)
)
raise RuntimeError()
finally:
self.kubecli.delete_pod(disk_pod_name, "default")
# Node scenario to detach the volume
def disk_detach_scenario(self, instance_kill_count, node, disk_attachment_details, timeout):
for _ in range(instance_kill_count):
try:
logging.info("Starting disk_detach_scenario injection")
logging.info(
"Detaching the %s disks from instance %s "
% (disk_attachment_details, node)
)
disk_pod_name = f"detach-disk-pod-{get_random_string(5)}"
detach_disk_command=''
for disk in disk_attachment_details:
detach_disk_command = detach_disk_command + "echo offline > /sys/block/" + disk + "/device/state;"
pod_command = ["chroot /host /bin/sh -c '" + detach_disk_command + "'"]
cmd_output = self.kubecli.exec_command_on_node(
node, pod_command, disk_pod_name, "default"
)
logging.info("Disk command output: %s" % (cmd_output))
logging.info("Disk %s has been detached from %s node" % (disk_attachment_details, node))
except Exception as e:
logging.error(
"Failed to detach disk from %s node. Encountered following"
"exception: %s." % (node, e)
)
logging.debug("")
raise RuntimeError()
finally:
self.kubecli.delete_pod(disk_pod_name, "default")
# Node scenario to attach the volume
def disk_attach_scenario(self, instance_kill_count, node, disk_attachment_details):
for _ in range(instance_kill_count):
try:
logging.info(
"Attaching the %s disks from instance %s "
% (disk_attachment_details, node)
)
disk_pod_name = f"attach-disk-pod-{get_random_string(5)}"
attach_disk_command=''
for disk in disk_attachment_details:
attach_disk_command = attach_disk_command + "echo running > /sys/block/" + disk + "/device/state;"
pod_command = ["chroot /host /bin/sh -c '" + attach_disk_command + "'"]
cmd_output = self.kubecli.exec_command_on_node(
node, pod_command, disk_pod_name, "default"
)
logging.info("Disk command output: %s" % (cmd_output))
logging.info("Disk %s has been attached to %s node" % (disk_attachment_details, node))
except Exception as e:
logging.error(
"Failed to attach disk to %s node. Encountered following"
"exception: %s." % (node, e)
)
logging.debug("")
raise RuntimeError()
finally:
self.kubecli.delete_pod(disk_pod_name, "default")

View File

@@ -0,0 +1,21 @@
from dataclasses import dataclass
@dataclass
class InputParams:
def __init__(self, config: dict[str,any] = None):
if config:
self.kill = config["kill"] if "kill" in config else 1
self.timeout = config["timeout"] if "timeout" in config else 120
self.duration = config["duration"] if "duration" in config else 10
self.krkn_pod_recovery_time = config["krkn_pod_recovery_time"] if "krkn_pod_recovery_time" in config else 120
self.label_selector = config["label_selector"] if "label_selector" in config else ""
self.namespace_pattern = config["namespace_pattern"] if "namespace_pattern" in config else ""
self.name_pattern = config["name_pattern"] if "name_pattern" in config else ""
namespace_pattern: str
krkn_pod_recovery_time: int
timeout: int
duration: int
kill: int
label_selector: str
name_pattern: str

View File

@@ -0,0 +1,164 @@
import logging
import random
import time
import yaml
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.k8s.pods_monitor_pool import PodsMonitorPool
from krkn.scenario_plugins.pod_disruption.models.models import InputParams
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import get_yaml_item_value
from datetime import datetime
from dataclasses import dataclass
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
@dataclass
class Pod:
namespace: str
name: str
creation_timestamp : str
class PodDisruptionScenarioPlugin(AbstractScenarioPlugin):
def run(
self,
run_uuid: str,
scenario: str,
krkn_config: dict[str, any],
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
) -> int:
pool = PodsMonitorPool(lib_telemetry.get_lib_kubernetes())
try:
with open(scenario, "r") as f:
cont_scenario_config = yaml.full_load(f)
for kill_scenario in cont_scenario_config:
kill_scenario_config = InputParams(kill_scenario["config"])
self.start_monitoring(
kill_scenario_config, pool
)
return_status = self.killing_pods(
kill_scenario_config, lib_telemetry.get_lib_kubernetes()
)
if return_status != 0:
result = pool.cancel()
else:
result = pool.join()
if result.error:
logging.error(
logging.error(
f"PodDisruptionScenariosPlugin pods failed to recovery: {result.error}"
)
)
return 1
scenario_telemetry.affected_pods = result
except (RuntimeError, Exception) as e:
logging.error("PodDisruptionScenariosPlugin exiting due to Exception %s" % e)
return 1
else:
return 0
def get_scenario_types(self) -> list[str]:
return ["pod_disruption_scenarios"]
def start_monitoring(self, kill_scenario: InputParams, pool: PodsMonitorPool):
recovery_time = kill_scenario.krkn_pod_recovery_time
if (
kill_scenario.namespace_pattern
and kill_scenario.label_selector
):
namespace_pattern = kill_scenario.namespace_pattern
label_selector = kill_scenario.label_selector
pool.select_and_monitor_by_namespace_pattern_and_label(
namespace_pattern=namespace_pattern,
label_selector=label_selector,
max_timeout=recovery_time,
field_selector="status.phase=Running"
)
logging.info(
f"waiting up to {recovery_time} seconds for pod recovery, "
f"pod label pattern: {label_selector} namespace pattern: {namespace_pattern}"
)
elif (
kill_scenario.namespace_pattern
and kill_scenario.name_pattern
):
namespace_pattern = kill_scenario.namespace_pattern
name_pattern = kill_scenario.name_pattern
pool.select_and_monitor_by_name_pattern_and_namespace_pattern(
pod_name_pattern=name_pattern,
namespace_pattern=namespace_pattern,
max_timeout=recovery_time,
field_selector="status.phase=Running"
)
logging.info(
f"waiting up to {recovery_time} seconds for pod recovery, "
f"pod name pattern: {name_pattern} namespace pattern: {namespace_pattern}"
)
else:
raise Exception(
f"impossible to determine monitor parameters, check {kill_scenario} configuration"
)
def get_pods(self, name_pattern, label_selector,namespace, kubecli: KrknKubernetes, field_selector: str =None):
if label_selector and name_pattern:
logging.error('Only, one of name pattern or label pattern can be specified')
elif label_selector:
pods = kubecli.select_pods_by_namespace_pattern_and_label(label_selector=label_selector,namespace_pattern=namespace, field_selector=field_selector)
elif name_pattern:
pods = kubecli.select_pods_by_name_pattern_and_namespace_pattern(pod_name_pattern=name_pattern, namespace_pattern=namespace, field_selector=field_selector)
else:
logging.error('Name pattern or label pattern must be specified ')
return pods
def killing_pods(self, config: InputParams, kubecli: KrknKubernetes):
# region Select target pods
namespace = config.namespace_pattern
if not namespace:
logging.error('Namespace pattern must be specified')
pods = self.get_pods(config.name_pattern,config.label_selector,config.namespace_pattern, kubecli, field_selector="status.phase=Running")
pods_count = len(pods)
if len(pods) < config.kill:
logging.error("Not enough pods match the criteria, expected {} but found only {} pods".format(
config.kill, len(pods)))
return 1
random.shuffle(pods)
for i in range(config.kill):
pod = pods[i]
logging.info(pod)
logging.info(f'Deleting pod {pod[0]}')
kubecli.delete_pod(pod[0], pod[1])
self.wait_for_pods(config.label_selector,config.name_pattern,config.namespace_pattern, pods_count, config.duration, config.timeout, kubecli)
return 0
def wait_for_pods(
self, label_selector, pod_name, namespace, pod_count, duration, wait_timeout, kubecli: KrknKubernetes
):
timeout = False
start_time = datetime.now()
while not timeout:
pods = self.get_pods(name_pattern=pod_name, label_selector=label_selector,namespace=namespace, field_selector="status.phase=Running", kubecli=kubecli)
if pod_count == len(pods):
return
time.sleep(duration)
now_time = datetime.now()
time_diff = now_time - start_time
if time_diff.seconds > wait_timeout:
logging.error("timeout while waiting for pods to come up")
return 1
return 0

View File

@@ -16,7 +16,7 @@ google-cloud-compute==1.22.0
ibm_cloud_sdk_core==3.18.0
ibm_vpc==0.20.0
jinja2==3.1.6
krkn-lib==5.0.2
krkn-lib==5.1.0
lxml==5.1.0
kubernetes==28.1.0
numpy==1.26.4
@@ -36,7 +36,5 @@ werkzeug==3.0.6
wheel==0.42.0
zope.interface==5.4.0
git+https://github.com/krkn-chaos/arcaflow-plugin-kill-pod.git@v0.1.0
git+https://github.com/vmware/vsphere-automation-sdk-python.git@v8.0.0.0
cryptography>=42.0.4 # not directly required, pinned by Snyk to avoid a vulnerability

5
scenarios/kind/pod_etcd.yml Executable file
View File

@@ -0,0 +1,5 @@
- id: kill-pods
config:
namespace_pattern: "kube-system"
label_selector: "component=etcd"
krkn_pod_recovery_time: 120

View File

@@ -8,7 +8,7 @@
execution: parallel
ingress: false
egress: true
target: node
target: ''
interfaces: []
ports:
- 2049
- 53

View File

@@ -0,0 +1,17 @@
- id: pod_network_filter
image: "quay.io/krkn-chaos/krkn-network-chaos:latest"
wait_duration: 1
test_duration: 60
label_selector: "app=network-attacked"
namespace: 'default'
instance_count: 1
execution: parallel
ingress: false
egress: true
target: ""
interfaces: []
protocols:
- tcp
- udp
ports:
- 53

View File

@@ -19,3 +19,15 @@ node_scenarios:
bmc_addr: mgmt-machine2.example.com
bmc_user: user # The baremetal IPMI user. Overrides the default IPMI user specified above. Optional if the default is set
bmc_password: pass # The baremetal IPMI password. Overrides the default IPMI user specified above. Optional if the default is set
- actions:
- node_disk_detach_attach_scenario
node_name: node-1
instance_count: 1
runs: 1
timeout: 360
duration: 120
parallel: False
cloud_type: bm
bmc_info:
node-1:
disks: ["sda", "sdb"] # List of disk devices to be used for disk detach/attach scenarios