From 69fc8e8d1b886a68e08bf433f51f2ae65484b885 Mon Sep 17 00:00:00 2001 From: Robert O'Brien Date: Tue, 10 May 2022 18:50:48 +0100 Subject: [PATCH] Add resource version to list node call --- kraken/kubernetes/client.py | 25 ++++++++- kraken/node_actions/common_node_functions.py | 57 ++++---------------- node_status.py | 0 3 files changed, 34 insertions(+), 48 deletions(-) delete mode 100644 node_status.py diff --git a/kraken/kubernetes/client.py b/kraken/kubernetes/client.py index 102a0ddc..0b136a64 100644 --- a/kraken/kubernetes/client.py +++ b/kraken/kubernetes/client.py @@ -1,4 +1,4 @@ -from kubernetes import client, config, utils +from kubernetes import client, config, utils, watch from kubernetes.dynamic.client import DynamicClient from kubernetes.stream import stream from kubernetes.client.rest import ApiException @@ -19,6 +19,7 @@ def initialize_clients(kubeconfig_path): global api_client global dyn_client global custom_object_client + global watch try: config.load_kube_config(kubeconfig_path) cli = client.CoreV1Api() @@ -27,6 +28,7 @@ def initialize_clients(kubeconfig_path): custom_object_client = client.CustomObjectsApi() k8s_client = config.new_client_from_config() dyn_client = DynamicClient(k8s_client) + watch = watch.Watch() except ApiException as e: logging.error("Failed to initialize kubernetes client: %s\n" % e) sys.exit(1) @@ -584,3 +586,24 @@ def find_kraken_node(): except Exception as e: logging.info("%s" % (e)) sys.exit(1) + + +# Watch for a specific node status +def watch_node_status(node, status, timeout, resource_version): + for event in watch( + cli.list_node, + field_selector=f"metadata.name={node}", + timeout_seconds=timeout, + resource_version=f"{resource_version}", + ): + conditions = [status for status in event["object"].status.conditions if status.type == "Ready"] + if conditions[0].status == status: + watch.stop() + break + else: + logging.info("node status " + str(conditions[0].status)) + + +# Get the resource version for the specified node +def get_node_resource_version(node): + return cli.read_node(name=node).metadata.resource_version diff --git a/kraken/node_actions/common_node_functions.py b/kraken/node_actions/common_node_functions.py index 67c5aef6..6a3b09a8 100644 --- a/kraken/node_actions/common_node_functions.py +++ b/kraken/node_actions/common_node_functions.py @@ -4,7 +4,6 @@ import logging import paramiko import kraken.kubernetes.client as kubecli import kraken.invoke.command as runcommand -from kubernetes import client, watch node_general = False @@ -32,56 +31,20 @@ def get_node(node_name, label_selector, instance_kill_count): # Wait until the node status becomes Ready def wait_for_ready_status(node, timeout): - w = watch.Watch() - v1 = client.CoreV1Api() - for event in w.stream( - v1.list_node, - field_selector=f"metadata.name={node}", - timeout_seconds=timeout, - limit=1, - ): - conditions = [status for status in event["object"].status.conditions if status.type == "Ready"] - if conditions[0].status == "True": - w.stop() - break - else: - logging.info("node status " + str(conditions[0].status)) - - -# Wait until the node status becomes Unknown -def wait_for_unknown_status(node, timeout): - w = watch.Watch() - v1 = client.CoreV1Api() - for event in w.stream( - v1.list_node, - field_selector=f"metadata.name={node}", - timeout_seconds=timeout, - limit=1, - ): - conditions = [status for status in event["object"].status.conditions if status.type == "Ready"] - if conditions[0].status == "Unknown": - w.stop() - break - else: - logging.info("node status " + str(conditions[0].status)) + resource_version = kubecli.get_node_resource_version(node) + kubecli.watch_node_status(node, "True", timeout, resource_version) # Wait until the node status becomes Not Ready def wait_for_not_ready_status(node, timeout): - w = watch.Watch() - v1 = client.CoreV1Api() - for event in w.stream( - v1.list_node, - field_selector=f"metadata.name={node}", - timeout_seconds=timeout, - limit=1, - ): - conditions = [status for status in event["object"].status.conditions if status.type == "Ready"] - if conditions[0].status == "False": - w.stop() - break - else: - logging.info("node status " + str(conditions[0].status)) + resource_version = kubecli.get_node_resource_version(node) + kubecli.watch_node_status(node, "False", timeout, resource_version) + + +# Wait until the node status becomes Unknown +def wait_for_unknown_status(node, timeout): + resource_version = kubecli.get_node_resource_version(node) + kubecli.watch_node_status(node, "Unknown", timeout, resource_version) # Get the ip of the cluster node diff --git a/node_status.py b/node_status.py deleted file mode 100644 index e69de29b..00000000