Namespaced cluster events and logs integration (#690)

* namespaced events integration

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* namespaced logs  implementation

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

namespaced logs plugin scenario

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

namespaced logs integration

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* logs collection fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* krkn-lib 3.1.0 update

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
This commit is contained in:
Tullio Sebastiani
2024-09-12 11:54:57 +02:00
committed by GitHub
parent 5e7938ba4a
commit 736c90e937
17 changed files with 452 additions and 138 deletions

View File

@@ -1,18 +1,24 @@
import yaml
import logging
import time
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
import kraken.cerberus.setup as cerberus
from jinja2 import Template
import kraken.invoke.command as runcommand
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils.functions import get_yaml_item_value, log_exception
from kraken import utils
# Reads the scenario config, applies and deletes a network policy to
# block the traffic for the specified duration
def run(scenarios_list, config, wait_duration,kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
def run(scenarios_list,
config,
wait_duration,
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]):
failed_post_scenarios = ""
scenario_telemetries: list[ScenarioTelemetry] = []
failed_scenarios = []
@@ -20,7 +26,7 @@ def run(scenarios_list, config, wait_duration,kubecli: KrknKubernetes, telemetry
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = app_outage_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, app_outage_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, app_outage_config)
if len(app_outage_config) > 1:
try:
with open(app_outage_config, "r") as f:
@@ -57,7 +63,7 @@ spec:
# Block the traffic by creating network policy
logging.info("Creating the network policy")
kubecli.create_net_policy(yaml_spec, namespace)
telemetry.kubecli.create_net_policy(yaml_spec, namespace)
# wait for the specified duration
logging.info("Waiting for the specified duration in the config: %s" % (duration))
@@ -65,7 +71,7 @@ spec:
# unblock the traffic by deleting the network policy
logging.info("Deleting the network policy")
kubecli.delete_net_policy("kraken-deny", namespace)
telemetry.kubecli.delete_net_policy("kraken-deny", namespace)
logging.info("End of scenario. Waiting for the specified duration: %s" % (wait_duration))
time.sleep(wait_duration)
@@ -79,6 +85,16 @@ spec:
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

View File

@@ -5,23 +5,47 @@ import yaml
import logging
from pathlib import Path
from typing import List
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from .context_auth import ContextAuth
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from .. import utils
def run(scenarios_list: List[str], kubeconfig_path: str, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
def run(scenarios_list: List[str],
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str
) -> (list[str], list[ScenarioTelemetry]):
scenario_telemetries: list[ScenarioTelemetry] = []
failed_post_scenarios = []
for scenario in scenarios_list:
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = scenario
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry,scenario)
start_time = time.time()
scenario_telemetry.start_timestamp = start_time
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario)
engine_args = build_args(scenario)
status_code = run_workflow(engine_args, kubeconfig_path)
scenario_telemetry.end_timestamp = time.time()
status_code = run_workflow(engine_args, telemetry.kubecli.get_kubeconfig_path())
end_time = time.time()
scenario_telemetry.end_timestamp = end_time
scenario_telemetry.exit_status = status_code
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(start_time),
int(end_time))
# this is the design proposal for the namespaced logs collection
# check the krkn-lib latest commit to follow also the changes made here
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(start_time),
int(end_time))
scenario_telemetries.append(scenario_telemetry)
if status_code != 0:
failed_post_scenarios.append(scenario)

View File

@@ -3,19 +3,26 @@ import logging
import time
import os
import random
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
import kraken.cerberus.setup as cerberus
import kraken.node_actions.common_node_functions as common_node_functions
from jinja2 import Environment, FileSystemLoader
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils.functions import get_yaml_item_value, log_exception
from kraken import utils
# krkn_lib
# Reads the scenario config and introduces traffic variations in Node's host network interface.
def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
failed_post_scenarios = ""
def run(scenarios_list,
config,
wait_duration,
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]):
logging.info("Runing the Network Chaos tests")
failed_post_scenarios = ""
scenario_telemetries: list[ScenarioTelemetry] = []
@@ -24,7 +31,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = net_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, net_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, net_config)
try:
with open(net_config, "r") as file:
param_lst = ["latency", "loss", "bandwidth"]
@@ -56,11 +63,11 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
node_name_list = [test_node]
nodelst = []
for single_node_name in node_name_list:
nodelst.extend(common_node_functions.get_node(single_node_name, test_node_label, test_instance_count, kubecli))
nodelst.extend(common_node_functions.get_node(single_node_name, test_node_label, test_instance_count, telemetry.kubecli))
file_loader = FileSystemLoader(os.path.abspath(os.path.dirname(__file__)))
env = Environment(loader=file_loader, autoescape=True)
pod_template = env.get_template("pod.j2")
test_interface = verify_interface(test_interface, nodelst, pod_template, kubecli)
test_interface = verify_interface(test_interface, nodelst, pod_template, telemetry.kubecli)
joblst = []
egress_lst = [i for i in param_lst if i in test_egress]
chaos_config = {
@@ -86,13 +93,13 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
job_template.render(jobname=i + str(hash(node))[:5], nodename=node, cmd=exec_cmd)
)
joblst.append(job_body["metadata"]["name"])
api_response = kubecli.create_job(job_body)
api_response = telemetry.kubecli.create_job(job_body)
if api_response is None:
raise Exception("Error creating job")
if test_execution == "serial":
logging.info("Waiting for serial job to finish")
start_time = int(time.time())
wait_for_job(joblst[:], kubecli, test_duration + 300)
wait_for_job(joblst[:], telemetry.kubecli, test_duration + 300)
logging.info("Waiting for wait_duration %s" % wait_duration)
time.sleep(wait_duration)
end_time = int(time.time())
@@ -102,7 +109,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
if test_execution == "parallel":
logging.info("Waiting for parallel job to finish")
start_time = int(time.time())
wait_for_job(joblst[:], kubecli, test_duration + 300)
wait_for_job(joblst[:], telemetry.kubecli, test_duration + 300)
logging.info("Waiting for wait_duration %s" % wait_duration)
time.sleep(wait_duration)
end_time = int(time.time())
@@ -112,13 +119,24 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
raise RuntimeError()
finally:
logging.info("Deleting jobs")
delete_job(joblst[:], kubecli)
delete_job(joblst[:], telemetry.kubecli)
except (RuntimeError, Exception):
scenario_telemetry.exit_status = 1
failed_scenarios.append(net_config)
log_exception(net_config)
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

View File

@@ -2,6 +2,10 @@ import yaml
import logging
import sys
import time
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from kraken import utils
from kraken.node_actions.aws_node_scenarios import aws_node_scenarios
from kraken.node_actions.general_cloud_node_scenarios import general_node_scenarios
from kraken.node_actions.az_node_scenarios import azure_node_scenarios
@@ -55,23 +59,27 @@ def get_node_scenario_object(node_scenario, kubecli: KrknKubernetes):
# Run defined scenarios
# krkn_lib
def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
def run(scenarios_list,
config,
wait_duration,
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]):
scenario_telemetries: list[ScenarioTelemetry] = []
failed_scenarios = []
for node_scenario_config in scenarios_list:
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = node_scenario_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, node_scenario_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, node_scenario_config)
with open(node_scenario_config, "r") as f:
node_scenario_config = yaml.full_load(f)
for node_scenario in node_scenario_config["node_scenarios"]:
node_scenario_object = get_node_scenario_object(node_scenario, kubecli)
node_scenario_object = get_node_scenario_object(node_scenario, telemetry.kubecli)
if node_scenario["actions"]:
for action in node_scenario["actions"]:
start_time = int(time.time())
try:
inject_node_scenario(action, node_scenario, node_scenario_object, kubecli)
inject_node_scenario(action, node_scenario, node_scenario_object, telemetry.kubecli)
logging.info("Waiting for the specified duration: %s" % (wait_duration))
time.sleep(wait_duration)
end_time = int(time.time())
@@ -85,6 +93,16 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

View File

@@ -9,9 +9,11 @@ from arcaflow_plugin_sdk import schema, serialization, jsonschema
from arcaflow_plugin_kill_pod import kill_pods, wait_for_pods
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.k8s.pods_monitor_pool import PodsMonitorPool
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
import kraken.plugins.node_scenarios.vmware_plugin as vmware_plugin
import kraken.plugins.node_scenarios.ibmcloud_plugin as ibmcloud_plugin
from kraken import utils
from kraken.plugins.run_python_plugin import run_python_file
from kraken.plugins.network.ingress_shaping import network_chaos
from kraken.plugins.pod_network_outage.pod_network_outage_plugin import pod_outage
@@ -249,13 +251,12 @@ PLUGINS = Plugins(
def run(scenarios: List[str],
kubeconfig_path: str,
kraken_config: str,
failed_post_scenarios: List[str],
wait_duration: int,
telemetry: KrknTelemetryKubernetes,
kubecli: KrknKubernetes,
run_uuid: str
telemetry: KrknTelemetryOpenshift,
run_uuid: str,
telemetry_request_id: str,
) -> (List[str], list[ScenarioTelemetry]):
scenario_telemetries: list[ScenarioTelemetry] = []
@@ -263,14 +264,14 @@ def run(scenarios: List[str],
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = scenario
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, scenario)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario)
logging.info('scenario ' + str(scenario))
pool = PodsMonitorPool(kubecli)
pool = PodsMonitorPool(telemetry.kubecli)
kill_scenarios = [kill_scenario for kill_scenario in PLUGINS.unserialize_scenario(scenario) if kill_scenario["id"] == "kill-pods"]
try:
start_monitoring(pool, kill_scenarios)
PLUGINS.run(scenario, kubeconfig_path, kraken_config, run_uuid)
PLUGINS.run(scenario, telemetry.kubecli.get_kubeconfig_path(), kraken_config, run_uuid)
result = pool.join()
scenario_telemetry.affected_pods = result
if result.error:
@@ -286,8 +287,19 @@ def run(scenarios: List[str],
scenario_telemetry.exit_status = 0
logging.info("Waiting for the specified duration: %s" % (wait_duration))
time.sleep(wait_duration)
scenario_telemetries.append(scenario_telemetry)
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_post_scenarios, scenario_telemetries

View File

@@ -7,15 +7,17 @@ import sys
import random
import arcaflow_plugin_kill_pod
from krkn_lib.k8s.pods_monitor_pool import PodsMonitorPool
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
import kraken.cerberus.setup as cerberus
import kraken.post_actions.actions as post_actions
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from arcaflow_plugin_sdk import serialization
from krkn_lib.utils.functions import get_yaml_item_value, log_exception
from kraken import utils
# Run pod based scenarios
def run(kubeconfig_path, scenarios_list, config, failed_post_scenarios, wait_duration):
@@ -73,25 +75,26 @@ def run(kubeconfig_path, scenarios_list, config, failed_post_scenarios, wait_dur
# krkn_lib
def container_run(kubeconfig_path,
def container_run(
scenarios_list,
config,
failed_post_scenarios,
wait_duration,
kubecli: KrknKubernetes,
telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str
) -> (list[str], list[ScenarioTelemetry]):
failed_scenarios = []
scenario_telemetries: list[ScenarioTelemetry] = []
pool = PodsMonitorPool(kubecli)
pool = PodsMonitorPool(telemetry.kubecli)
for container_scenario_config in scenarios_list:
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = container_scenario_config[0]
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, container_scenario_config[0])
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, container_scenario_config[0])
if len(container_scenario_config) > 1:
pre_action_output = post_actions.run(kubeconfig_path, container_scenario_config[1])
pre_action_output = post_actions.run(telemetry.kubecli.get_kubeconfig_path(), container_scenario_config[1])
else:
pre_action_output = ""
with open(container_scenario_config[0], "r") as f:
@@ -101,7 +104,7 @@ def container_run(kubeconfig_path,
# capture start time
start_time = int(time.time())
try:
killed_containers = container_killing_in_pod(cont_scenario, kubecli)
killed_containers = container_killing_in_pod(cont_scenario, telemetry.kubecli)
logging.info(f"killed containers: {str(killed_containers)}")
result = pool.join()
if result.error:
@@ -125,6 +128,16 @@ def container_run(kubeconfig_path,
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

View File

@@ -3,15 +3,21 @@ import random
import re
import time
import yaml
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from .. import utils
from ..cerberus import setup as cerberus
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils.functions import get_yaml_item_value, log_exception
# krkn_lib
def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
def run(scenarios_list,
config,
wait_duration,
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]):
"""
Reads the scenario config and creates a temp file to fill up the PVC
"""
@@ -22,7 +28,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = app_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, app_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, app_config)
try:
if len(app_config) > 1:
with open(app_config, "r") as f:
@@ -85,7 +91,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
"pod_name '%s' will be overridden with one of "
"the pods mounted in the PVC" % (str(pod_name))
)
pvc = kubecli.get_pvc_info(pvc_name, namespace)
pvc = telemetry.kubecli.get_pvc_info(pvc_name, namespace)
try:
# random generator not used for
# security/cryptographic purposes.
@@ -100,7 +106,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
raise RuntimeError()
# Get volume name
pod = kubecli.get_pod_info(name=pod_name, namespace=namespace)
pod = telemetry.kubecli.get_pod_info(name=pod_name, namespace=namespace)
if pod is None:
logging.error(
@@ -117,7 +123,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
if volume.pvcName is not None:
volume_name = volume.name
pvc_name = volume.pvcName
pvc = kubecli.get_pvc_info(pvc_name, namespace)
pvc = telemetry.kubecli.get_pvc_info(pvc_name, namespace)
break
if 'pvc' not in locals():
logging.error(
@@ -144,7 +150,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
# Get PVC capacity and used bytes
command = "df %s -B 1024 | sed 1d" % (str(mount_path))
command_output = (
kubecli.exec_cmd_in_pod(
telemetry.kubecli.exec_cmd_in_pod(
command,
pod_name,
namespace,
@@ -206,7 +212,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
logging.debug(
"Create temp file in the PVC command:\n %s" % command
)
kubecli.exec_cmd_in_pod(
telemetry.kubecli.exec_cmd_in_pod(
command,
pod_name,
namespace,
@@ -216,7 +222,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
# Check if file is created
command = "ls -lh %s" % (str(mount_path))
logging.debug("Check file is created command:\n %s" % command)
response = kubecli.exec_cmd_in_pod(
response = telemetry.kubecli.exec_cmd_in_pod(
command, pod_name, namespace, container_name
)
logging.info("\n" + str(response))
@@ -238,7 +244,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
container_name,
mount_path,
file_size_kb,
kubecli
telemetry.kubecli
)
# sys.exit(1)
raise RuntimeError()
@@ -275,14 +281,14 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
logging.debug(
"Create temp file in the PVC command:\n %s" % command
)
kubecli.exec_cmd_in_pod(
telemetry.kubecli.exec_cmd_in_pod(
command, pod_name, namespace, container_name
)
# Check if file is created
command = "ls -lh %s" % (str(mount_path))
logging.debug("Check file is created command:\n %s" % command)
response = kubecli.exec_cmd_in_pod(
response = telemetry.kubecli.exec_cmd_in_pod(
command, pod_name, namespace, container_name
)
logging.info("\n" + str(response))
@@ -303,7 +309,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
container_name,
mount_path,
file_size_kb,
kubecli
telemetry.kubecli
)
logging.info("End of scenario. Waiting for the specified duration: %s" % (wait_duration))
time.sleep(wait_duration)
@@ -321,6 +327,18 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
log_exception(app_config)
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

View File

@@ -1,14 +1,18 @@
import time
import random
import logging
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
import kraken.cerberus.setup as cerberus
import kraken.post_actions.actions as post_actions
import yaml
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils.functions import get_yaml_item_value, log_exception
from kraken import utils
def delete_objects(kubecli, namespace):
@@ -156,9 +160,8 @@ def run(
config,
wait_duration,
failed_post_scenarios,
kubeconfig_path,
kubecli: KrknKubernetes,
telemetry: KrknTelemetryKubernetes
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str
) -> (list[str], list[ScenarioTelemetry]):
scenario_telemetries: list[ScenarioTelemetry] = []
failed_scenarios = []
@@ -166,10 +169,10 @@ def run(
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = scenario_config[0]
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, scenario_config[0])
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario_config[0])
try:
if len(scenario_config) > 1:
pre_action_output = post_actions.run(kubeconfig_path, scenario_config[1])
pre_action_output = post_actions.run(telemetry.kubecli.get_kubeconfig_path(), scenario_config[1])
else:
pre_action_output = ""
with open(scenario_config[0], "r") as f:
@@ -206,7 +209,7 @@ def run(
start_time = int(time.time())
for i in range(run_count):
killed_namespaces = {}
namespaces = kubecli.check_namespaces([scenario_namespace], scenario_label)
namespaces = telemetry.kubecli.check_namespaces([scenario_namespace], scenario_label)
for j in range(delete_count):
if len(namespaces) == 0:
logging.error(
@@ -220,7 +223,7 @@ def run(
logging.info('Delete objects in selected namespace: ' + selected_namespace )
try:
# delete all pods in namespace
objects = delete_objects(kubecli,selected_namespace)
objects = delete_objects(telemetry.kubecli,selected_namespace)
killed_namespaces[selected_namespace] = objects
logging.info("Deleted all objects in namespace %s was successful" % str(selected_namespace))
except Exception as e:
@@ -236,7 +239,7 @@ def run(
if len(scenario_config) > 1:
try:
failed_post_scenarios = post_actions.check_recovery(
kubeconfig_path, scenario_config, failed_post_scenarios, pre_action_output
telemetry.kubecli.get_kubeconfig_path(), scenario_config, failed_post_scenarios, pre_action_output
)
except Exception as e:
logging.error("Failed to run post action checks: %s" % e)
@@ -244,7 +247,7 @@ def run(
# sys.exit(1)
raise RuntimeError()
else:
failed_post_scenarios = check_all_running_deployment(killed_namespaces, wait_time, kubecli)
failed_post_scenarios = check_all_running_deployment(killed_namespaces, wait_time, telemetry.kubecli)
end_time = int(time.time())
cerberus.publish_kraken_status(config, failed_post_scenarios, start_time, end_time)
@@ -255,6 +258,16 @@ def run(
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

View File

@@ -1,20 +1,26 @@
import logging
import time
import yaml
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.utils import log_exception
from kraken import utils
def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
scenario_telemetries= list[ScenarioTelemetry]()
def run(scenarios_list: list[str],
wait_duration: int,
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]):
scenario_telemetries = list[ScenarioTelemetry]()
failed_post_scenarios = []
for scenario in scenarios_list:
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = scenario
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, scenario)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario)
with open(scenario) as stream:
scenario_config = yaml.safe_load(stream)
@@ -26,9 +32,9 @@ def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes,
chaos_duration = scenario_config["chaos_duration"]
logging.info(f"checking service {service_name} in namespace: {service_namespace}")
if not krkn_lib.service_exists(service_name, service_namespace):
if not telemetry.kubecli.service_exists(service_name, service_namespace):
logging.error(f"service: {service_name} not found in namespace: {service_namespace}, failed to run scenario.")
fail(scenario_telemetry, scenario_telemetries)
fail_scenario_telemetry(scenario_telemetry)
failed_post_scenarios.append(scenario)
break
try:
@@ -37,18 +43,18 @@ def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes,
# both named ports and port numbers can be used
if isinstance(target_port, int):
logging.info(f"webservice will listen on port {target_port}")
webservice = krkn_lib.deploy_service_hijacking(service_namespace, plan, image, port_number=target_port)
webservice = telemetry.kubecli.deploy_service_hijacking(service_namespace, plan, image, port_number=target_port)
else:
logging.info(f"traffic will be redirected to named port: {target_port}")
webservice = krkn_lib.deploy_service_hijacking(service_namespace, plan, image, port_name=target_port)
webservice = telemetry.kubecli.deploy_service_hijacking(service_namespace, plan, image, port_name=target_port)
logging.info(f"successfully deployed pod: {webservice.pod_name} "
f"in namespace:{service_namespace} with selector {webservice.selector}!"
)
logging.info(f"patching service: {service_name} to hijack traffic towards: {webservice.pod_name}")
original_service = krkn_lib.replace_service_selector([webservice.selector], service_name, service_namespace)
original_service = telemetry.kubecli.replace_service_selector([webservice.selector], service_name, service_namespace)
if original_service is None:
logging.error(f"failed to patch service: {service_name}, namespace: {service_namespace} with selector {webservice.selector}")
fail(scenario_telemetry, scenario_telemetries)
fail_scenario_telemetry(scenario_telemetry)
failed_post_scenarios.append(scenario)
break
@@ -58,33 +64,40 @@ def run(scenarios_list: list[str],wait_duration: int, krkn_lib: KrknKubernetes,
time.sleep(chaos_duration)
selectors = ["=".join([key, original_service["spec"]["selector"][key]]) for key in original_service["spec"]["selector"].keys()]
logging.info(f"restoring the service selectors {selectors}")
original_service = krkn_lib.replace_service_selector(selectors, service_name, service_namespace)
original_service = telemetry.kubecli.replace_service_selector(selectors, service_name, service_namespace)
if original_service is None:
logging.error(f"failed to restore original service: {service_name}, namespace: {service_namespace} with selectors: {selectors}")
fail(scenario_telemetry, scenario_telemetries)
fail_scenario_telemetry(scenario_telemetry)
failed_post_scenarios.append(scenario)
break
logging.info("selectors successfully restored")
logging.info("undeploying service-hijacking resources...")
krkn_lib.undeploy_service_hijacking(webservice)
telemetry.kubecli.undeploy_service_hijacking(webservice)
logging.info("End of scenario. Waiting for the specified duration: %s" % (wait_duration))
time.sleep(wait_duration)
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
scenario_telemetries.append(scenario_telemetry)
logging.info("success")
except Exception as e:
logging.error(f"scenario {scenario} failed with exception: {e}")
fail(scenario_telemetry, scenario_telemetries)
failed_post_scenarios.append(scenario)
fail_scenario_telemetry(scenario_telemetry)
log_exception(scenario)
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_post_scenarios, scenario_telemetries
def fail(scenario_telemetry: ScenarioTelemetry, scenario_telemetries: list[ScenarioTelemetry]):
def fail_scenario_telemetry(scenario_telemetry: ScenarioTelemetry):
scenario_telemetry.exit_status = 1
scenario_telemetry.end_timestamp = time.time()
scenario_telemetries.append(scenario_telemetry)
scenario_telemetry.end_timestamp = time.time()

View File

@@ -3,6 +3,10 @@ import yaml
import logging
import time
from multiprocessing.pool import ThreadPool
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from .. import utils
from ..cerberus import setup as cerberus
from ..post_actions import actions as post_actions
from ..node_actions.aws_node_scenarios import AWS
@@ -10,7 +14,6 @@ from ..node_actions.openstack_node_scenarios import OPENSTACKCLOUD
from ..node_actions.az_node_scenarios import Azure
from ..node_actions.gcp_node_scenarios import GCP
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils.functions import log_exception
@@ -134,7 +137,11 @@ def cluster_shut_down(shut_down_config, kubecli: KrknKubernetes):
# krkn_lib
def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
def run(scenarios_list,
config,
wait_duration,
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]):
failed_post_scenarios = []
failed_scenarios = []
scenario_telemetries: list[ScenarioTelemetry] = []
@@ -153,7 +160,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = config_path
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, config_path)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, config_path)
with open(config_path, "r") as f:
shut_down_config_yaml = yaml.full_load(f)
@@ -161,7 +168,7 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
shut_down_config_yaml["cluster_shut_down_scenario"]
start_time = int(time.time())
try:
cluster_shut_down(shut_down_config_scenario, kubecli)
cluster_shut_down(shut_down_config_scenario, telemetry.kubecli)
logging.info(
"Waiting for the specified duration: %s" % (wait_duration)
)
@@ -185,6 +192,16 @@ def run(scenarios_list, config, wait_duration, kubecli: KrknKubernetes, telemetr
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

View File

@@ -8,31 +8,37 @@ import yaml
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from kraken import utils
def run(scenarios_list: list[str], krkn_kubernetes: KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
def run(scenarios_list: list[str],
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str
) -> (list[str], list[ScenarioTelemetry]):
scenario_telemetries: list[ScenarioTelemetry] = []
failed_post_scenarios = []
for scenario in scenarios_list:
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = scenario
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, scenario)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, scenario)
try:
pod_names = []
config = parse_config(scenario)
if config["target-service-label"]:
target_services = krkn_kubernetes.select_service_by_label(config["namespace"], config["target-service-label"])
target_services = telemetry.kubecli.select_service_by_label(config["namespace"], config["target-service-label"])
else:
target_services = [config["target-service"]]
for target in target_services:
if not krkn_kubernetes.service_exists(target, config["namespace"]):
if not telemetry.kubecli.service_exists(target, config["namespace"]):
raise Exception(f"{target} service not found")
for i in range(config["number-of-pods"]):
pod_name = "syn-flood-" + krkn_lib.utils.get_random_string(10)
krkn_kubernetes.deploy_syn_flood(pod_name,
telemetry.kubecli.deploy_syn_flood(pod_name,
config["namespace"],
config["image"],
target,
@@ -49,7 +55,7 @@ def run(scenarios_list: list[str], krkn_kubernetes: KrknKubernetes, telemetry: K
finished_pods = []
while not did_finish:
for pod_name in pod_names:
if not krkn_kubernetes.is_pod_running(pod_name, config["namespace"]):
if not telemetry.kubecli.is_pod_running(pod_name, config["namespace"]):
finished_pods.append(pod_name)
if set(pod_names) == set(finished_pods):
did_finish = True
@@ -62,6 +68,16 @@ def run(scenarios_list: list[str], krkn_kubernetes: KrknKubernetes, telemetry: K
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_post_scenarios, scenario_telemetries

View File

@@ -6,12 +6,12 @@ import re
import yaml
import random
from krkn_lib import utils
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from kubernetes.client import ApiException
from .. import utils
from ..cerberus import setup as cerberus
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils.functions import get_yaml_item_value, log_exception, get_random_string
@@ -348,21 +348,25 @@ def check_date_time(object_type, names, kubecli:KrknKubernetes):
# krkn_lib
def run(scenarios_list, config, wait_duration, kubecli:KrknKubernetes, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]):
def run(scenarios_list,
config,
wait_duration,
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]):
failed_scenarios = []
scenario_telemetries: list[ScenarioTelemetry] = []
for time_scenario_config in scenarios_list:
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = time_scenario_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, time_scenario_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, time_scenario_config)
try:
with open(time_scenario_config, "r") as f:
scenario_config = yaml.full_load(f)
for time_scenario in scenario_config["time_scenarios"]:
start_time = int(time.time())
object_type, object_names = skew_time(time_scenario, kubecli)
not_reset = check_date_time(object_type, object_names, kubecli)
object_type, object_names = skew_time(time_scenario, telemetry.kubecli)
not_reset = check_date_time(object_type, object_names, telemetry.kubecli)
if len(not_reset) > 0:
logging.info("Object times were not reset")
logging.info(
@@ -383,6 +387,16 @@ def run(scenarios_list, config, wait_duration, kubecli:KrknKubernetes, telemetry
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

View File

@@ -1 +1,2 @@
from .TeeLogHandler import TeeLogHandler
from .functions import *

60
kraken/utils/functions.py Normal file
View File

@@ -0,0 +1,60 @@
import krkn_lib.utils
from krkn_lib.k8s import KrknKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from tzlocal.unix import get_localzone
def populate_cluster_events(scenario_telemetry: ScenarioTelemetry,
scenario_config: dict,
kubecli: KrknKubernetes,
start_timestamp: int,
end_timestamp: int
):
events = []
namespaces = __retrieve_namespaces(scenario_config, kubecli)
if len(namespaces) == 0:
events.extend(kubecli.collect_and_parse_cluster_events(start_timestamp, end_timestamp, str(get_localzone())))
else:
for namespace in namespaces:
events.extend(kubecli.collect_and_parse_cluster_events(start_timestamp, end_timestamp, str(get_localzone()),
namespace=namespace))
scenario_telemetry.set_cluster_events(events)
def collect_and_put_ocp_logs(telemetry_ocp: KrknTelemetryOpenshift,
scenario_config: dict,
request_id: str,
start_timestamp: int,
end_timestamp: int,
):
if (
telemetry_ocp.krkn_telemetry_config and
telemetry_ocp.krkn_telemetry_config["enabled"] and
telemetry_ocp.krkn_telemetry_config["logs_backup"] and
not telemetry_ocp.kubecli.is_kubernetes()
):
namespaces = __retrieve_namespaces(scenario_config, telemetry_ocp.kubecli)
if len(namespaces) > 0:
for namespace in namespaces:
telemetry_ocp.put_ocp_logs(request_id,
telemetry_ocp.krkn_telemetry_config,
start_timestamp,
end_timestamp,
namespace)
else:
telemetry_ocp.put_ocp_logs(request_id,
telemetry_ocp.krkn_telemetry_config,
start_timestamp,
end_timestamp)
def __retrieve_namespaces(scenario_config: dict, kubecli: KrknKubernetes) -> set[str]:
namespaces = list()
namespaces.extend(krkn_lib.utils.deep_get_attribute("namespace", scenario_config))
namespace_patterns = krkn_lib.utils.deep_get_attribute("namespace_pattern", scenario_config)
for pattern in namespace_patterns:
namespaces.extend(kubecli.list_namespaces_by_regex(pattern))
return set(namespaces)

View File

@@ -1,13 +1,20 @@
import yaml
import logging
import time
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from .. import utils
from ..node_actions.aws_node_scenarios import AWS
from ..cerberus import setup as cerberus
from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.models.telemetry import ScenarioTelemetry
from krkn_lib.utils.functions import log_exception
def run(scenarios_list, config, wait_duration, telemetry: KrknTelemetryKubernetes) -> (list[str], list[ScenarioTelemetry]) :
def run(scenarios_list,
config,
wait_duration,
telemetry: KrknTelemetryOpenshift,
telemetry_request_id: str) -> (list[str], list[ScenarioTelemetry]) :
"""
filters the subnet of interest and applies the network acl
to create zone outage
@@ -20,7 +27,7 @@ def run(scenarios_list, config, wait_duration, telemetry: KrknTelemetryKubernete
scenario_telemetry = ScenarioTelemetry()
scenario_telemetry.scenario = zone_outage_config
scenario_telemetry.start_timestamp = time.time()
telemetry.set_parameters_base64(scenario_telemetry, zone_outage_config)
parsed_scenario_config = telemetry.set_parameters_base64(scenario_telemetry, zone_outage_config)
try:
if len(zone_outage_config) > 1:
with open(zone_outage_config, "r") as f:
@@ -116,6 +123,16 @@ def run(scenarios_list, config, wait_duration, telemetry: KrknTelemetryKubernete
else:
scenario_telemetry.exit_status = 0
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(telemetry,
parsed_scenario_config,
telemetry_request_id,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
utils.populate_cluster_events(scenario_telemetry,
parsed_scenario_config,
telemetry.kubecli,
int(scenario_telemetry.start_timestamp),
int(scenario_telemetry.end_timestamp))
scenario_telemetries.append(scenario_telemetry)
return failed_scenarios, scenario_telemetries

View File

@@ -15,7 +15,7 @@ google-api-python-client==2.116.0
ibm_cloud_sdk_core==3.18.0
ibm_vpc==0.20.0
jinja2==3.1.4
krkn-lib==3.0.0
krkn-lib==3.1.0
lxml==5.1.0
kubernetes==28.1.0
numpy==1.26.4

View File

@@ -14,6 +14,8 @@ from krkn_lib.elastic.krkn_elastic import KrknElastic
from krkn_lib.models.elastic import ElasticChaosRunTelemetry
from krkn_lib.models.krkn import ChaosRunOutput, ChaosRunAlertSummary
from krkn_lib.prometheus.krkn_prometheus import KrknPrometheus
from tzlocal.unix import get_localzone
import kraken.time_actions.common_time_functions as time_actions
import kraken.performance_dashboards.setup as performance_dashboards
import kraken.pod_scenarios.setup as pod_scenarios
@@ -235,8 +237,8 @@ def main(cfg) -> int:
logging.info("Cluster version CRD not detected, skipping")
# KrknTelemetry init
telemetry_k8s = KrknTelemetryKubernetes(safe_logger, kubecli)
telemetry_ocp = KrknTelemetryOpenshift(safe_logger, ocpcli)
telemetry_k8s = KrknTelemetryKubernetes(safe_logger, kubecli, config["telemetry"])
telemetry_ocp = KrknTelemetryOpenshift(safe_logger, ocpcli, config["telemetry"])
if enable_elastic:
elastic_search = KrknElastic(safe_logger,
elastic_url,
@@ -315,33 +317,33 @@ def main(cfg) -> int:
return 1
elif scenario_type == "arcaflow_scenarios":
failed_post_scenarios, scenario_telemetries = arcaflow_plugin.run(
scenarios_list, kubeconfig_path, telemetry_k8s
scenarios_list,
telemetry_ocp,
telemetry_request_id
)
chaos_telemetry.scenarios.extend(scenario_telemetries)
elif scenario_type == "plugin_scenarios":
failed_post_scenarios, scenario_telemetries = plugins.run(
scenarios_list,
kubeconfig_path,
kraken_config,
failed_post_scenarios,
wait_duration,
telemetry_k8s,
kubecli,
run_uuid
telemetry_ocp,
run_uuid,
telemetry_request_id
)
chaos_telemetry.scenarios.extend(scenario_telemetries)
# krkn_lib
elif scenario_type == "container_scenarios":
logging.info("Running container scenarios")
failed_post_scenarios, scenario_telemetries = pod_scenarios.container_run(
kubeconfig_path,
scenarios_list,
config,
failed_post_scenarios,
wait_duration,
kubecli,
telemetry_k8s
telemetry_ocp,
telemetry_request_id
)
chaos_telemetry.scenarios.extend(scenario_telemetries)
@@ -349,14 +351,21 @@ def main(cfg) -> int:
# krkn_lib
elif scenario_type == "node_scenarios":
logging.info("Running node scenarios")
failed_post_scenarios, scenario_telemetries = nodeaction.run(scenarios_list, config, wait_duration, kubecli, telemetry_k8s)
failed_post_scenarios, scenario_telemetries = nodeaction.run(scenarios_list,
config,
wait_duration,
telemetry_ocp,
telemetry_request_id)
chaos_telemetry.scenarios.extend(scenario_telemetries)
# Inject managedcluster chaos scenarios specified in the config
# krkn_lib
elif scenario_type == "managedcluster_scenarios":
logging.info("Running managedcluster scenarios")
managedcluster_scenarios.run(
scenarios_list, config, wait_duration, kubecli
scenarios_list,
config,
wait_duration,
kubecli
)
# Inject time skew chaos scenarios specified
@@ -364,12 +373,22 @@ def main(cfg) -> int:
# krkn_lib
elif scenario_type == "time_scenarios":
logging.info("Running time skew scenarios")
failed_post_scenarios, scenario_telemetries = time_actions.run(scenarios_list, config, wait_duration, kubecli, telemetry_k8s)
failed_post_scenarios, scenario_telemetries = time_actions.run(scenarios_list,
config,
wait_duration,
telemetry_ocp,
telemetry_request_id
)
chaos_telemetry.scenarios.extend(scenario_telemetries)
# Inject cluster shutdown scenarios
# krkn_lib
elif scenario_type == "cluster_shut_down_scenarios":
failed_post_scenarios, scenario_telemetries = shut_down.run(scenarios_list, config, wait_duration, kubecli, telemetry_k8s)
failed_post_scenarios, scenario_telemetries = shut_down.run(scenarios_list,
config,
wait_duration,
telemetry_ocp,
telemetry_request_id
)
chaos_telemetry.scenarios.extend(scenario_telemetries)
# Inject namespace chaos scenarios
@@ -381,43 +400,69 @@ def main(cfg) -> int:
config,
wait_duration,
failed_post_scenarios,
kubeconfig_path,
kubecli,
telemetry_k8s
telemetry_ocp,
telemetry_request_id
)
chaos_telemetry.scenarios.extend(scenario_telemetries)
# Inject zone failures
elif scenario_type == "zone_outages":
logging.info("Inject zone outages")
failed_post_scenarios, scenario_telemetries = zone_outages.run(scenarios_list, config, wait_duration, telemetry_k8s)
failed_post_scenarios, scenario_telemetries = zone_outages.run(scenarios_list,
config,
wait_duration,
telemetry_ocp,
telemetry_request_id
)
chaos_telemetry.scenarios.extend(scenario_telemetries)
# Application outages
elif scenario_type == "application_outages":
logging.info("Injecting application outage")
failed_post_scenarios, scenario_telemetries = application_outage.run(
scenarios_list, config, wait_duration, kubecli, telemetry_k8s)
scenarios_list,
config,
wait_duration,
telemetry_ocp,
telemetry_request_id
)
chaos_telemetry.scenarios.extend(scenario_telemetries)
# PVC scenarios
# krkn_lib
elif scenario_type == "pvc_scenarios":
logging.info("Running PVC scenario")
failed_post_scenarios, scenario_telemetries = pvc_scenario.run(scenarios_list, config, wait_duration, kubecli, telemetry_k8s)
failed_post_scenarios, scenario_telemetries = pvc_scenario.run(scenarios_list,
config,
wait_duration,
telemetry_ocp,
telemetry_request_id
)
chaos_telemetry.scenarios.extend(scenario_telemetries)
# Network scenarios
# krkn_lib
elif scenario_type == "network_chaos":
logging.info("Running Network Chaos")
failed_post_scenarios, scenario_telemetries = network_chaos.run(scenarios_list, config, wait_duration, kubecli, telemetry_k8s)
failed_post_scenarios, scenario_telemetries = network_chaos.run(scenarios_list,
config,
wait_duration,
telemetry_ocp,
telemetry_request_id
)
elif scenario_type == "service_hijacking":
logging.info("Running Service Hijacking Chaos")
failed_post_scenarios, scenario_telemetries = service_hijacking_plugin.run(scenarios_list, wait_duration, kubecli, telemetry_k8s)
failed_post_scenarios, scenario_telemetries = service_hijacking_plugin.run(scenarios_list,
wait_duration,
telemetry_ocp,
telemetry_request_id
)
chaos_telemetry.scenarios.extend(scenario_telemetries)
elif scenario_type == "syn_flood":
logging.info("Running Syn Flood Chaos")
failed_post_scenarios, scenario_telemetries = syn_flood.run(scenarios_list, kubecli, telemetry_k8s)
failed_post_scenarios, scenario_telemetries = syn_flood.run(scenarios_list,
telemetry_ocp,
telemetry_request_id
)
chaos_telemetry.scenarios.extend(scenario_telemetries)
# Check for critical alerts when enabled
@@ -454,7 +499,8 @@ def main(cfg) -> int:
else:
telemetry_k8s.collect_cluster_metadata(chaos_telemetry)
decoded_chaos_run_telemetry = ChaosRunTelemetry(json.loads(chaos_telemetry.to_json()))
telemetry_json = chaos_telemetry.to_json()
decoded_chaos_run_telemetry = ChaosRunTelemetry(json.loads(telemetry_json))
chaos_output.telemetry = decoded_chaos_run_telemetry
logging.info(f"Chaos data:\n{chaos_output.to_json()}")
if enable_elastic:
@@ -470,7 +516,6 @@ def main(cfg) -> int:
logging.info(f"telemetry upload log: {safe_logger.log_file_name}")
try:
telemetry_k8s.send_telemetry(config["telemetry"], telemetry_request_id, chaos_telemetry)
telemetry_k8s.put_cluster_events(telemetry_request_id, config["telemetry"], start_time, end_time)
telemetry_k8s.put_critical_alerts(telemetry_request_id, config["telemetry"], summary)
# prometheus data collection is available only on Openshift
if config["telemetry"]["prometheus_backup"]:
@@ -499,8 +544,7 @@ def main(cfg) -> int:
if prometheus_archive_files:
safe_logger.info("starting prometheus archive upload:")
telemetry_k8s.put_prometheus_data(config["telemetry"], prometheus_archive_files, telemetry_request_id)
if config["telemetry"]["logs_backup"] and distribution == "openshift":
telemetry_ocp.put_ocp_logs(telemetry_request_id, config["telemetry"], start_time, end_time)
except Exception as e:
logging.error(f"failed to send telemetry data: {str(e)}")
else: