Adds the startup option to produce prow junit XML output for sippy integration (#684)

* removed legacy kubernetes module

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* added sippy junit XML file production options

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* krkn-lib update

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

krkn-lib update

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
This commit is contained in:
Tullio Sebastiani
2024-08-13 12:40:34 +02:00
committed by GitHub
parent 1057917731
commit 9cd086f59c
7 changed files with 126 additions and 985 deletions

View File

@@ -1,892 +0,0 @@
import logging
import re
import sys
import time
from kubernetes import client, config, utils, watch
from kubernetes.client.rest import ApiException
from kubernetes.dynamic.client import DynamicClient
from kubernetes.stream import stream
from ..kubernetes.resources import (PVC, ChaosEngine, ChaosResult, Container,
LitmusChaosObject, Pod, Volume,
VolumeMount)
kraken_node_name = ""
# Load kubeconfig and initialize kubernetes python client
def initialize_clients(kubeconfig_path):
global cli
global batch_cli
global watch_resource
global api_client
global dyn_client
global custom_object_client
try:
if kubeconfig_path:
config.load_kube_config(kubeconfig_path)
else:
config.load_incluster_config()
api_client = client.ApiClient()
cli = client.CoreV1Api(api_client)
batch_cli = client.BatchV1Api(api_client)
custom_object_client = client.CustomObjectsApi(api_client)
dyn_client = DynamicClient(api_client)
watch_resource = watch.Watch()
except ApiException as e:
logging.error("Failed to initialize kubernetes client: %s\n" % e)
sys.exit(1)
def get_host() -> str:
"""Returns the Kubernetes server URL"""
return client.configuration.Configuration.get_default_copy().host
def get_clusterversion_string() -> str:
"""
Returns clusterversion status text on OpenShift, empty string
on other distributions
"""
try:
cvs = custom_object_client.list_cluster_custom_object(
"config.openshift.io",
"v1",
"clusterversions",
)
for cv in cvs["items"]:
for condition in cv["status"]["conditions"]:
if condition["type"] == "Progressing":
return condition["message"]
return ""
except client.exceptions.ApiException as e:
if e.status == 404:
return ""
else:
raise
# List all namespaces
def list_namespaces(label_selector=None):
namespaces = []
try:
if label_selector:
ret = cli.list_namespace(
pretty=True,
label_selector=label_selector
)
else:
ret = cli.list_namespace(pretty=True)
except ApiException as e:
logging.error(
"Exception when calling CoreV1Api->list_namespaced_pod: %s\n" % e
)
raise e
for namespace in ret.items:
namespaces.append(namespace.metadata.name)
return namespaces
def get_namespace_status(namespace_name):
"""Get status of a given namespace"""
ret = ""
try:
ret = cli.read_namespace_status(namespace_name)
except ApiException as e:
logging.error(
"Exception when calling CoreV1Api->read_namespace_status: %s\n" % e
)
return ret.status.phase
def delete_namespace(namespace):
"""Deletes a given namespace using kubernetes python client"""
try:
api_response = cli.delete_namespace(namespace)
logging.debug(
"Namespace deleted. status='%s'" % str(api_response.status)
)
return api_response
except Exception as e:
logging.error(
"Exception when calling \
CoreV1Api->delete_namespace: %s\n"
% e
)
def check_namespaces(namespaces, label_selectors=None):
"""Check if all the watch_namespaces are valid"""
try:
valid_namespaces = list_namespaces(label_selectors)
regex_namespaces = set(namespaces) - set(valid_namespaces)
final_namespaces = set(namespaces) - set(regex_namespaces)
valid_regex = set()
if regex_namespaces:
for namespace in valid_namespaces:
for regex_namespace in regex_namespaces:
if re.search(regex_namespace, namespace):
final_namespaces.add(namespace)
valid_regex.add(regex_namespace)
break
invalid_namespaces = regex_namespaces - valid_regex
if invalid_namespaces:
raise Exception(
"There exists no namespaces matching: %s" %
(invalid_namespaces)
)
return list(final_namespaces)
except Exception as e:
logging.info("%s" % (e))
sys.exit(1)
# List nodes in the cluster
def list_nodes(label_selector=None):
nodes = []
try:
if label_selector:
ret = cli.list_node(pretty=True, label_selector=label_selector)
else:
ret = cli.list_node(pretty=True)
except ApiException as e:
logging.error("Exception when calling CoreV1Api->list_node: %s\n" % e)
raise e
for node in ret.items:
nodes.append(node.metadata.name)
return nodes
# List nodes in the cluster that can be killed
def list_killable_nodes(label_selector=None):
nodes = []
try:
if label_selector:
ret = cli.list_node(pretty=True, label_selector=label_selector)
else:
ret = cli.list_node(pretty=True)
except ApiException as e:
logging.error("Exception when calling CoreV1Api->list_node: %s\n" % e)
raise e
for node in ret.items:
if kraken_node_name != node.metadata.name:
for cond in node.status.conditions:
if str(cond.type) == "Ready" and str(cond.status) == "True":
nodes.append(node.metadata.name)
return nodes
# List managedclusters attached to the hub that can be killed
def list_killable_managedclusters(label_selector=None):
managedclusters = []
try:
ret = custom_object_client.list_cluster_custom_object(
group="cluster.open-cluster-management.io",
version="v1",
plural="managedclusters",
label_selector=label_selector
)
except ApiException as e:
logging.error("Exception when calling CustomObjectsApi->list_cluster_custom_object: %s\n" % e)
raise e
for managedcluster in ret['items']:
conditions = managedcluster['status']['conditions']
available = list(filter(lambda condition: condition['reason'] == 'ManagedClusterAvailable', conditions))
if available and available[0]['status'] == 'True':
managedclusters.append(managedcluster['metadata']['name'])
return managedclusters
# List pods in the given namespace
def list_pods(namespace, label_selector=None):
pods = []
try:
if label_selector:
ret = cli.list_namespaced_pod(
namespace,
pretty=True,
label_selector=label_selector
)
else:
ret = cli.list_namespaced_pod(namespace, pretty=True)
except ApiException as e:
logging.error(
"Exception when calling \
CoreV1Api->list_namespaced_pod: %s\n"
% e
)
raise e
for pod in ret.items:
pods.append(pod.metadata.name)
return pods
def get_all_pods(label_selector=None):
pods = []
if label_selector:
ret = cli.list_pod_for_all_namespaces(
pretty=True,
label_selector=label_selector
)
else:
ret = cli.list_pod_for_all_namespaces(pretty=True)
for pod in ret.items:
pods.append([pod.metadata.name, pod.metadata.namespace])
return pods
# Execute command in pod
def exec_cmd_in_pod(
command,
pod_name,
namespace,
container=None,
base_command="bash"
):
exec_command = [base_command, "-c", command]
try:
if container:
ret = stream(
cli.connect_get_namespaced_pod_exec,
pod_name,
namespace,
container=container,
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
else:
ret = stream(
cli.connect_get_namespaced_pod_exec,
pod_name,
namespace,
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
except Exception:
return False
return ret
def delete_pod(name, namespace):
try:
cli.delete_namespaced_pod(name=name, namespace=namespace)
while cli.read_namespaced_pod(name=name, namespace=namespace):
time.sleep(1)
except ApiException as e:
if e.status == 404:
logging.info("Pod already deleted")
else:
logging.error("Failed to delete pod %s" % e)
raise e
def create_pod(body, namespace, timeout=120):
try:
pod_stat = None
pod_stat = cli.create_namespaced_pod(body=body, namespace=namespace)
end_time = time.time() + timeout
while True:
pod_stat = cli.read_namespaced_pod(
name=body["metadata"]["name"],
namespace=namespace
)
if pod_stat.status.phase == "Running":
break
if time.time() > end_time:
raise Exception("Starting pod failed")
time.sleep(1)
except Exception as e:
logging.error("Pod creation failed %s" % e)
if pod_stat:
logging.error(pod_stat.status.container_statuses)
delete_pod(body["metadata"]["name"], namespace)
sys.exit(1)
def read_pod(name, namespace="default"):
return cli.read_namespaced_pod(name=name, namespace=namespace)
def get_pod_log(name, namespace="default"):
return cli.read_namespaced_pod_log(
name=name,
namespace=namespace,
_return_http_data_only=True,
_preload_content=False
)
def get_containers_in_pod(pod_name, namespace):
pod_info = cli.read_namespaced_pod(pod_name, namespace)
container_names = []
for cont in pod_info.spec.containers:
container_names.append(cont.name)
return container_names
def delete_job(name, namespace="default"):
try:
api_response = batch_cli.delete_namespaced_job(
name=name,
namespace=namespace,
body=client.V1DeleteOptions(
propagation_policy="Foreground",
grace_period_seconds=0
),
)
logging.debug("Job deleted. status='%s'" % str(api_response.status))
return api_response
except ApiException as api:
logging.warn(
"Exception when calling \
BatchV1Api->create_namespaced_job: %s"
% api
)
logging.warn("Job already deleted\n")
except Exception as e:
logging.error(
"Exception when calling \
BatchV1Api->delete_namespaced_job: %s\n"
% e
)
sys.exit(1)
def create_job(body, namespace="default"):
try:
api_response = batch_cli.create_namespaced_job(
body=body,
namespace=namespace
)
return api_response
except ApiException as api:
logging.warn(
"Exception when calling \
BatchV1Api->create_job: %s"
% api
)
if api.status == 409:
logging.warn("Job already present")
except Exception as e:
logging.error(
"Exception when calling \
BatchV1Api->create_namespaced_job: %s"
% e
)
raise
def create_manifestwork(body, namespace):
try:
api_response = custom_object_client.create_namespaced_custom_object(
group="work.open-cluster-management.io",
version="v1",
plural="manifestworks",
body=body,
namespace=namespace
)
return api_response
except ApiException as e:
print("Exception when calling CustomObjectsApi->create_namespaced_custom_object: %s\n" % e)
def delete_manifestwork(namespace):
try:
api_response = custom_object_client.delete_namespaced_custom_object(
group="work.open-cluster-management.io",
version="v1",
plural="manifestworks",
name="managedcluster-scenarios-template",
namespace=namespace
)
return api_response
except ApiException as e:
print("Exception when calling CustomObjectsApi->delete_namespaced_custom_object: %s\n" % e)
def get_job_status(name, namespace="default"):
try:
return batch_cli.read_namespaced_job_status(
name=name,
namespace=namespace
)
except Exception as e:
logging.error(
"Exception when calling \
BatchV1Api->read_namespaced_job_status: %s"
% e
)
raise
# Monitor the status of the cluster nodes and set the status to true or false
def monitor_nodes():
nodes = list_nodes()
notready_nodes = []
node_kerneldeadlock_status = "False"
for node in nodes:
try:
node_info = cli.read_node_status(node, pretty=True)
except ApiException as e:
logging.error(
"Exception when calling \
CoreV1Api->read_node_status: %s\n"
% e
)
raise e
for condition in node_info.status.conditions:
if condition.type == "KernelDeadlock":
node_kerneldeadlock_status = condition.status
elif condition.type == "Ready":
node_ready_status = condition.status
else:
continue
if node_kerneldeadlock_status != "False" or node_ready_status != "True": # noqa # noqa
notready_nodes.append(node)
if len(notready_nodes) != 0:
status = False
else:
status = True
return status, notready_nodes
# Monitor the status of the pods in the specified namespace
# and set the status to true or false
def monitor_namespace(namespace):
pods = list_pods(namespace)
notready_pods = []
for pod in pods:
try:
pod_info = cli.read_namespaced_pod_status(
pod,
namespace,
pretty=True
)
except ApiException as e:
logging.error(
"Exception when calling \
CoreV1Api->read_namespaced_pod_status: %s\n"
% e
)
raise e
pod_status = pod_info.status.phase
if (
pod_status != "Running" and
pod_status != "Completed" and
pod_status != "Succeeded"
):
notready_pods.append(pod)
if len(notready_pods) != 0:
status = False
else:
status = True
return status, notready_pods
# Monitor component namespace
def monitor_component(iteration, component_namespace):
watch_component_status, failed_component_pods = \
monitor_namespace(component_namespace)
logging.info(
"Iteration %s: %s: %s" % (
iteration,
component_namespace,
watch_component_status
)
)
return watch_component_status, failed_component_pods
def apply_yaml(path, namespace='default'):
"""
Apply yaml config to create Kubernetes resources
Args:
path (string)
- Path to the YAML file
namespace (string)
- Namespace to create the resource
Returns:
The object created
"""
return utils.create_from_yaml(
api_client,
yaml_file=path,
namespace=namespace
)
def get_pod_info(name: str, namespace: str = 'default') -> Pod:
"""
Function to retrieve information about a specific pod
in a given namespace. The kubectl command is given by:
kubectl get pods <name> -n <namespace>
Args:
name (string)
- Name of the pod
namespace (string)
- Namespace to look for the pod
Returns:
- Data class object of type Pod with the output of the above
kubectl command in the given format if the pod exists
- Returns None if the pod doesn't exist
"""
pod_exists = check_if_pod_exists(name=name, namespace=namespace)
if pod_exists:
response = cli.read_namespaced_pod(
name=name,
namespace=namespace,
pretty='true'
)
container_list = []
# Create a list of containers present in the pod
for container in response.spec.containers:
volume_mount_list = []
for volume_mount in container.volume_mounts:
volume_mount_list.append(
VolumeMount(
name=volume_mount.name,
mountPath=volume_mount.mount_path
)
)
container_list.append(
Container(
name=container.name,
image=container.image,
volumeMounts=volume_mount_list
)
)
for i, container in enumerate(response.status.container_statuses):
container_list[i].ready = container.ready
# Create a list of volumes associated with the pod
volume_list = []
for volume in response.spec.volumes:
volume_name = volume.name
pvc_name = (
volume.persistent_volume_claim.claim_name
if volume.persistent_volume_claim is not None
else None
)
volume_list.append(Volume(name=volume_name, pvcName=pvc_name))
# Create the Pod data class object
pod_info = Pod(
name=response.metadata.name,
podIP=response.status.pod_ip,
namespace=response.metadata.namespace,
containers=container_list,
nodeName=response.spec.node_name,
volumes=volume_list
)
return pod_info
else:
logging.error(
"Pod '%s' doesn't exist in namespace '%s'" % (
str(name),
str(namespace)
)
)
return None
def get_litmus_chaos_object(
kind: str,
name: str,
namespace: str
) -> LitmusChaosObject:
"""
Function that returns an object of a custom resource type of
the litmus project. Currently, only ChaosEngine and ChaosResult
objects are supported.
Args:
kind (string)
- The custom resource type
namespace (string)
- Namespace where the custom object is present
Returns:
Data class object of a subclass of LitmusChaosObject
"""
group = 'litmuschaos.io'
version = 'v1alpha1'
if kind.lower() == 'chaosengine':
plural = 'chaosengines'
response = custom_object_client.get_namespaced_custom_object(
group=group,
plural=plural,
version=version,
namespace=namespace,
name=name
)
try:
engine_status = response['status']['engineStatus']
exp_status = response['status']['experiments'][0]['status']
except Exception:
engine_status = 'Not Initialized'
exp_status = 'Not Initialized'
custom_object = ChaosEngine(
kind='ChaosEngine',
group=group,
namespace=namespace,
name=name,
plural=plural,
version=version,
engineStatus=engine_status,
expStatus=exp_status
)
elif kind.lower() == 'chaosresult':
plural = 'chaosresults'
response = custom_object_client.get_namespaced_custom_object(
group=group,
plural=plural,
version=version,
namespace=namespace,
name=name
)
try:
verdict = response['status']['experimentStatus']['verdict']
fail_step = response['status']['experimentStatus']['failStep']
except Exception:
verdict = 'N/A'
fail_step = 'N/A'
custom_object = ChaosResult(
kind='ChaosResult',
group=group,
namespace=namespace,
name=name,
plural=plural,
version=version,
verdict=verdict,
failStep=fail_step
)
else:
logging.error("Invalid litmus chaos custom resource name")
custom_object = None
return custom_object
def check_if_namespace_exists(name: str) -> bool:
"""
Function that checks if a namespace exists by parsing through
the list of projects.
Args:
name (string)
- Namespace name
Returns:
Boolean value indicating whether the namespace exists or not
"""
v1_projects = dyn_client.resources.get(
api_version='project.openshift.io/v1',
kind='Project'
)
project_list = v1_projects.get()
return True if name in str(project_list) else False
def check_if_pod_exists(name: str, namespace: str) -> bool:
"""
Function that checks if a pod exists in the given namespace
Args:
name (string)
- Pod name
namespace (string)
- Namespace name
Returns:
Boolean value indicating whether the pod exists or not
"""
namespace_exists = check_if_namespace_exists(namespace)
if namespace_exists:
pod_list = list_pods(namespace=namespace)
if name in pod_list:
return True
else:
logging.error("Namespace '%s' doesn't exist" % str(namespace))
return False
def check_if_pvc_exists(name: str, namespace: str) -> bool:
"""
Function that checks if a namespace exists by parsing through
the list of projects.
Args:
name (string)
- PVC name
namespace (string)
- Namespace name
Returns:
Boolean value indicating whether the Persistent Volume Claim
exists or not.
"""
namespace_exists = check_if_namespace_exists(namespace)
if namespace_exists:
response = cli.list_namespaced_persistent_volume_claim(
namespace=namespace
)
pvc_list = [pvc.metadata.name for pvc in response.items]
if name in pvc_list:
return True
else:
logging.error("Namespace '%s' doesn't exist" % str(namespace))
return False
def get_pvc_info(name: str, namespace: str) -> PVC:
"""
Function to retrieve information about a Persistent Volume Claim in a
given namespace
Args:
name (string)
- Name of the persistent volume claim
namespace (string)
- Namespace where the persistent volume claim is present
Returns:
- A PVC data class containing the name, capacity, volume name,
namespace and associated pod names of the PVC if the PVC exists
- Returns None if the PVC doesn't exist
"""
pvc_exists = check_if_pvc_exists(name=name, namespace=namespace)
if pvc_exists:
pvc_info_response = cli.read_namespaced_persistent_volume_claim(
name=name,
namespace=namespace,
pretty=True
)
pod_list_response = cli.list_namespaced_pod(namespace=namespace)
capacity = pvc_info_response.status.capacity['storage']
volume_name = pvc_info_response.spec.volume_name
# Loop through all pods in the namespace to find associated PVCs
pvc_pod_list = []
for pod in pod_list_response.items:
for volume in pod.spec.volumes:
if (
volume.persistent_volume_claim is not None
and volume.persistent_volume_claim.claim_name == name
):
pvc_pod_list.append(pod.metadata.name)
pvc_info = PVC(
name=name,
capacity=capacity,
volumeName=volume_name,
podNames=pvc_pod_list,
namespace=namespace
)
return pvc_info
else:
logging.error(
"PVC '%s' doesn't exist in namespace '%s'" % (
str(name),
str(namespace)
)
)
return None
# Find the node kraken is deployed on
# Set global kraken node to not delete
def find_kraken_node():
pods = get_all_pods()
kraken_pod_name = None
for pod in pods:
if "kraken-deployment" in pod[0]:
kraken_pod_name = pod[0]
kraken_project = pod[1]
break
# have to switch to proper project
if kraken_pod_name:
# get kraken-deployment pod, find node name
try:
node_name = get_pod_info(kraken_pod_name, kraken_project).nodeName
global kraken_node_name
kraken_node_name = node_name
except Exception as e:
logging.info("%s" % (e))
sys.exit(1)
# Watch for a specific node status
def watch_node_status(node, status, timeout, resource_version):
count = timeout
for event in watch_resource.stream(
cli.list_node,
field_selector=f"metadata.name={node}",
timeout_seconds=timeout,
resource_version=f"{resource_version}"
):
conditions = [
status
for status in event["object"].status.conditions
if status.type == "Ready"
]
if conditions[0].status == status:
watch_resource.stop()
break
else:
count -= 1
logging.info(
"Status of node " + node + ": " + str(conditions[0].status)
)
if not count:
watch_resource.stop()
# Watch for a specific managedcluster status
# TODO: Implement this with a watcher instead of polling
def watch_managedcluster_status(managedcluster, status, timeout):
elapsed_time = 0
while True:
conditions = custom_object_client.get_cluster_custom_object_status(
"cluster.open-cluster-management.io", "v1", "managedclusters", managedcluster
)['status']['conditions']
available = list(filter(lambda condition: condition['reason'] == 'ManagedClusterAvailable', conditions))
if status == "True":
if available and available[0]['status'] == "True":
logging.info("Status of managedcluster " + managedcluster + ": Available")
return True
else:
if not available:
logging.info("Status of managedcluster " + managedcluster + ": Unavailable")
return True
time.sleep(2)
elapsed_time += 2
if elapsed_time >= timeout:
logging.info("Timeout waiting for managedcluster " + managedcluster + " to become: " + status)
return False
# Get the resource version for the specified node
def get_node_resource_version(node):
return cli.read_node(name=node).metadata.resource_version

View File

@@ -1,74 +0,0 @@
from dataclasses import dataclass
from typing import List
@dataclass(frozen=True, order=False)
class Volume:
"""Data class to hold information regarding volumes in a pod"""
name: str
pvcName: str
@dataclass(order=False)
class VolumeMount:
"""Data class to hold information regarding volume mounts"""
name: str
mountPath: str
@dataclass(frozen=True, order=False)
class PVC:
"""Data class to hold information regarding persistent volume claims"""
name: str
capacity: str
volumeName: str
podNames: List[str]
namespace: str
@dataclass(order=False)
class Container:
"""Data class to hold information regarding containers in a pod"""
image: str
name: str
volumeMounts: List[VolumeMount]
ready: bool = False
@dataclass(frozen=True, order=False)
class Pod:
"""Data class to hold information regarding a pod"""
name: str
podIP: str
namespace: str
containers: List[Container]
nodeName: str
volumes: List[Volume]
@dataclass(frozen=True, order=False)
class LitmusChaosObject:
"""Data class to hold information regarding a custom object of litmus project"""
kind: str
group: str
namespace: str
name: str
plural: str
version: str
@dataclass(frozen=True, order=False)
class ChaosEngine(LitmusChaosObject):
"""Data class to hold information regarding a ChaosEngine object"""
engineStatus: str
expStatus: str
@dataclass(frozen=True, order=False)
class ChaosResult(LitmusChaosObject):
"""Data class to hold information regarding a ChaosResult object"""
verdict: str
failStep: str

View File

@@ -0,0 +1,12 @@
import logging
class TeeLogHandler(logging.Handler):
logs: list[str] = []
name = "TeeLogHandler"
def get_output(self) -> str:
return "\n".join(self.logs)
def emit(self, record):
self.logs.append(self.formatter.format(record))
def __del__(self):
pass

1
kraken/utils/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .TeeLogHandler import TeeLogHandler

View File

@@ -15,7 +15,7 @@ google-api-python-client==2.116.0
ibm_cloud_sdk_core==3.18.0
ibm_vpc==0.20.0
jinja2==3.1.4
krkn-lib==2.1.7
krkn-lib==2.1.9
lxml==5.1.0
kubernetes==28.1.0
oauth2client==4.1.3

View File

@@ -35,13 +35,14 @@ from krkn_lib.telemetry.k8s import KrknTelemetryKubernetes
from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift
from krkn_lib.models.telemetry import ChaosRunTelemetry
from krkn_lib.utils import SafeLogger
from krkn_lib.utils.functions import get_yaml_item_value
from krkn_lib.utils.functions import get_yaml_item_value, get_junit_test_case
from kraken.utils import TeeLogHandler
report_file = ""
# Main function
def main(cfg):
def main(cfg) -> int:
# Start kraken
print(pyfiglet.figlet_format("kraken"))
logging.info("Starting kraken")
@@ -108,7 +109,8 @@ def main(cfg):
logging.error(
"Cannot read the kubeconfig file at %s, please check" % kubeconfig_path
)
sys.exit(1)
#sys.exit(1)
return 1
logging.info("Initializing client to talk to the Kubernetes cluster")
# Generate uuid for the run
@@ -142,10 +144,12 @@ def main(cfg):
# Set up kraken url to track signal
if not 0 <= int(port) <= 65535:
logging.error("%s isn't a valid port number, please check" % (port))
sys.exit(1)
#sys.exit(1)
return 1
if not signal_address:
logging.error("Please set the signal address in the config")
sys.exit(1)
#sys.exit(1)
return 1
address = (signal_address, port)
# If publish_running_status is False this should keep us going
@@ -176,7 +180,7 @@ def main(cfg):
except Exception:
logging.error("invalid distribution selected, running openshift scenarios against kubernetes cluster."
"Please set 'kubernetes' in config.yaml krkn.platform and try again")
sys.exit(1)
return 1
if cv != "":
logging.info(cv)
else:
@@ -251,7 +255,7 @@ def main(cfg):
"plugin_scenarios with the "
"kill-pods configuration instead."
)
sys.exit(1)
return 1
elif scenario_type == "arcaflow_scenarios":
failed_post_scenarios, scenario_telemetries = arcaflow_plugin.run(
scenarios_list, kubeconfig_path, telemetry_k8s
@@ -453,17 +457,20 @@ def main(cfg):
)
else:
logging.error("Alert profile is not defined")
sys.exit(1)
#sys.exit(1)
return 1
if post_critical_alerts > 0:
logging.error("Critical alerts are firing, please check; exiting")
sys.exit(2)
#sys.exit(2)
return 2
if failed_post_scenarios:
logging.error(
"Post scenarios are still failing at the end of all iterations"
)
sys.exit(2)
#sys.exit(2)
return 2
logging.info(
"Successfully finished running Kraken. UUID for the run: "
@@ -471,7 +478,11 @@ def main(cfg):
)
else:
logging.error("Cannot find a config at %s, please check" % (cfg))
sys.exit(1)
#sys.exit(1)
return 2
return 0
if __name__ == "__main__":
@@ -491,19 +502,102 @@ if __name__ == "__main__":
help="output report location",
default="kraken.report",
)
parser.add_option(
"--junit-testcase",
dest="junit_testcase",
help="junit test case description",
default=None,
)
parser.add_option(
"--junit-testcase-path",
dest="junit_testcase_path",
help="junit test case path",
default=None,
)
parser.add_option(
"--junit-testcase-version",
dest="junit_testcase_version",
help="junit test case version",
default=None,
)
(options, args) = parser.parse_args()
report_file = options.output
tee_handler = TeeLogHandler()
handlers = [logging.FileHandler(report_file, mode="w"), logging.StreamHandler(), tee_handler]
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(report_file, mode="w"),
logging.StreamHandler(),
],
handlers=handlers,
)
option_error = False
# used to check if there is any missing or wrong parameter that prevents
# the creation of the junit file
junit_error = False
junit_normalized_path = None
retval = 0
junit_start_time = time.time()
# checks if both mandatory options for junit are set
if options.junit_testcase_path and not options.junit_testcase:
logging.error("please set junit test case description with --junit-testcase [description] option")
option_error = True
junit_error = True
if options.junit_testcase and not options.junit_testcase_path:
logging.error("please set junit test case path with --junit-testcase-path [path] option")
option_error = True
junit_error = True
# normalized path
if options.junit_testcase:
junit_normalized_path = os.path.normpath(options.junit_testcase_path)
if not os.path.exists(junit_normalized_path):
logging.error(f"{junit_normalized_path} do not exists, please select a valid path")
option_error = True
junit_error = True
if not os.path.isdir(junit_normalized_path):
logging.error(f"{junit_normalized_path} is a file, please select a valid folder path")
option_error = True
junit_error = True
if not os.access(junit_normalized_path, os.W_OK):
logging.error(f"{junit_normalized_path} is not writable, please select a valid path")
option_error = True
junit_error = True
if options.cfg is None:
logging.error("Please check if you have passed the config")
sys.exit(1)
option_error = True
if option_error:
retval = 1
else:
main(options.cfg)
retval = main(options.cfg)
junit_endtime = time.time()
# checks the minimum required parameters to write the junit file
if junit_normalized_path and not junit_error:
junit_testcase_xml = get_junit_test_case(
success=True if retval == 0 else False,
time=int(junit_endtime - junit_start_time),
test_suite_name="krkn-test-suite",
test_case_description=options.junit_testcase,
test_stdout=tee_handler.get_output(),
test_version=options.junit_testcase_version
)
junit_testcase_file_path = f"{junit_normalized_path}/junit_krkn_{int(time.time())}.xml"
logging.info(f"writing junit XML testcase in {junit_testcase_file_path}")
with open(junit_testcase_file_path, "w") as stream:
stream.write(junit_testcase_xml)
sys.exit(retval)