Files
kubeinvaders/scripts/metrics_loop/start.py
Eugenio Marzo 5fb2d9cac6 fix
2026-03-14 19:14:44 +01:00

349 lines
16 KiB
Python

from asyncio.log import logger
import yaml
import json
import logging
import os
import sys
logging.basicConfig(
level=logging.INFO,
stream=sys.stdout,
format="%(asctime)s %(levelname)s %(message)s",
force=True,
)
logging.getLogger().setLevel(logging.INFO)
_file_handler = logging.FileHandler("/tmp/kubeinvaders_metrics_loop.log")
_file_handler.setLevel(logging.INFO)
_file_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
logging.getLogger().addHandler(_file_handler)
from kubernetes import client, config
from kubernetes.client.rest import ApiException
import requests
from string import Template
import string
import random
import redis
import time
import urllib3
from urllib.parse import urlparse
from urllib3.exceptions import LocationValueError
from urllib3.exceptions import MaxRetryError
import time
import datetime
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def normalize_k8s_host(raw_host):
if not raw_host:
return ""
host = str(raw_host).strip().strip('"').strip("'")
if not host:
return ""
if not host.startswith("http://") and not host.startswith("https://"):
host = "https://" + host
host = host.rstrip("/")
parsed = urlparse(host)
if not parsed.netloc:
return ""
return host
def join_string_to_array(project_list_string, separator):
return project_list_string.split(separator)
def do_http_request(url, method, headers, data):
try:
# Do not inherit HTTP(S)_PROXY from the container environment;
# those proxies often cannot resolve in-cluster or local ingress hosts.
with requests.Session() as session:
session.trust_env = False
response = session.request(
method,
url,
headers=headers,
data=data,
verify=False,
allow_redirects=True,
timeout=10,
)
return response
except requests.exceptions.RequestException as e:
logging.error(f"Error while sending HTTP request to {url} with method {method}: {e}")
return "Connection Error"
def check_if_json_is_valid(json_data):
try:
json.loads(json_data)
except ValueError as e:
return False
return True
def create_container(image, name, command, args):
container = client.V1Container(
image=image,
name=name,
image_pull_policy='IfNotPresent',
args=args,
command=command,
)
logging.debug(
f"Created container with name: {container.name}, "
f"image: {container.image} and args: {container.args}"
)
return container
def create_pod_template(pod_name, container, job_name):
pod_template = client.V1PodTemplateSpec(
spec=client.V1PodSpec(restart_policy="Never", containers=[container]),
metadata=client.V1ObjectMeta(name=pod_name, labels={"chaos-controller": "kubeinvaders", "job-name": job_name}),
)
return pod_template
def create_job(job_name, pod_template):
metadata = client.V1ObjectMeta(name=job_name, labels={"chaos-controller": "kubeinvaders"})
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=metadata,
spec=client.V1JobSpec(backoff_limit=0, template=pod_template),
)
#logger.info(job)
return job
r = redis.Redis(unix_socket_path='/tmp/redis.sock')
logging.getLogger('kubernetes').setLevel(logging.ERROR)
logging.debug('Starting script for KubeInvaders metrics loop')
configuration = client.Configuration()
token = os.environ.get("TOKEN", "")
if not token:
redis_token = r.get("x_k8s_token")
if redis_token:
token = redis_token.decode()
logging.info("[k-inv][metrics_loop] TOKEN not set in env, using X-K8S-Token from Redis")
token = token.strip()
if token.lower().startswith("bearer "):
token = token[7:].strip()
if not token:
logging.warning("[k-inv][metrics_loop] No token available (TOKEN env and Redis x_k8s_token both empty) — API calls may fail")
configuration.api_key = {"authorization": token}
configuration.api_key_prefix = {"authorization": "Bearer"}
arg_host = sys.argv[1] if len(sys.argv) > 1 else ""
resolved_host = normalize_k8s_host(arg_host)
if not resolved_host:
redis_endpoint = r.get("k8s_api_endpoint")
if redis_endpoint:
redis_endpoint = redis_endpoint.decode() if isinstance(redis_endpoint, bytes) else redis_endpoint
resolved_host = normalize_k8s_host(redis_endpoint)
if resolved_host:
logging.info(f"[k-inv][metrics_loop] Using Kubernetes host from Redis k8s_api_endpoint: {resolved_host}")
if not resolved_host:
resolved_host = normalize_k8s_host(os.environ.get("ENDPOINT", ""))
if resolved_host:
logging.info(f"[k-inv][metrics_loop] Using Kubernetes host from ENDPOINT: {resolved_host}")
if not resolved_host:
svc_host = os.environ.get("KUBERNETES_SERVICE_HOST", "")
svc_port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS", "443")
resolved_host = normalize_k8s_host(f"https://{svc_host}:{svc_port}" if svc_host else "")
if resolved_host:
logging.info(f"[k-inv][metrics_loop] Using in-cluster Kubernetes host: {resolved_host}")
if not resolved_host:
resolved_host = "https://kubernetes.default.svc"
logging.warning("[k-inv][metrics_loop] No valid Kubernetes host provided; defaulting to https://kubernetes.default.svc")
configuration.host = resolved_host
logging.info(f"[k-inv][metrics_loop] Kubernetes API host resolved to: {configuration.host}")
configuration.insecure_skip_tls_verify = True
configuration.verify_ssl = False
api_client = client.ApiClient(configuration=configuration)
api_instance = client.CoreV1Api(api_client)
batch_api = client.BatchV1Api(api_client)
def rebuild_api_client_from_fallbacks(current_configuration):
fallback_host = ""
redis_endpoint = r.get("k8s_api_endpoint")
if redis_endpoint:
redis_endpoint = redis_endpoint.decode() if isinstance(redis_endpoint, bytes) else redis_endpoint
fallback_host = normalize_k8s_host(redis_endpoint)
if not fallback_host:
fallback_host = normalize_k8s_host(os.environ.get("ENDPOINT", ""))
if not fallback_host:
svc_host = os.environ.get("KUBERNETES_SERVICE_HOST", "")
svc_port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS", "443")
fallback_host = normalize_k8s_host(f"https://{svc_host}:{svc_port}" if svc_host else "")
if not fallback_host:
fallback_host = "https://kubernetes.default.svc"
current_configuration.host = fallback_host
logging.warning(f"[k-inv][metrics_loop] Rebuilding Kubernetes API client with host: {fallback_host}")
new_api_client = client.ApiClient(configuration=current_configuration)
return client.CoreV1Api(new_api_client), client.BatchV1Api(new_api_client)
def sync_api_client_from_redis(current_configuration):
redis_endpoint = r.get("k8s_api_endpoint")
if not redis_endpoint:
return None, None
redis_endpoint = redis_endpoint.decode() if isinstance(redis_endpoint, bytes) else redis_endpoint
redis_host = normalize_k8s_host(redis_endpoint)
if not redis_host:
return None, None
if current_configuration.host != redis_host:
current_configuration.host = redis_host
logging.info(f"[k-inv][metrics_loop] Switching Kubernetes API host from Redis k8s_api_endpoint to: {redis_host}")
new_api_client = client.ApiClient(configuration=current_configuration)
return client.CoreV1Api(new_api_client), client.BatchV1Api(new_api_client)
return None, None
while True:
maybe_api_instance, maybe_batch_api = sync_api_client_from_redis(configuration)
if maybe_api_instance and maybe_batch_api:
api_instance, batch_api = maybe_api_instance, maybe_batch_api
#logging.info(f"[k-inv][metrics_loop] Metrics loop is active - {r.exists('chaos_report_project_list')}")
if r.exists('chaos_report_project_list'):
logging.info("[k-inv][metrics_loop] Found chaos_report_project_list Redis Key")
for project in join_string_to_array(r.get('chaos_report_project_list').decode(), ','):
logging.info(f"[k-inv][metrics_loop] Computing Chaos Report Project: {project}")
chaos_program_key = f"chaos_report_project_{project}"
if r.exists(chaos_program_key):
logging.info(f"[k-inv][metrics_loop][chaos_report] Found chaos_report_project_{project} key in Redis. Starting {project} ")
if check_if_json_is_valid(r.get(chaos_program_key)):
chaos_report_program = json.loads(r.get(chaos_program_key))
now = datetime.datetime.now()
logging.info(f"[k-inv][metrics_loop][chaos_report] chaos_report_program is valid JSON: {chaos_report_program}")
url = str(chaos_report_program.get('chaosReportCheckSiteURL', '')).strip()
method = str(chaos_report_program.get('chaosReportCheckSiteURLMethod', 'GET')).strip().upper()
payload = chaos_report_program.get('chaosReportCheckSiteURLPayload', '')
logging.info(
f"[k-inv][metrics_loop][chaos_report] project={project} parsed url='{url}' method='{method}'"
)
headers = {"Content-Type": "application/json; charset=utf-8"}
raw_headers = chaos_report_program.get('chaosReportCheckSiteURLHeaders', '{}')
try:
parsed_headers = json.loads(raw_headers) if isinstance(raw_headers, str) else raw_headers
if isinstance(parsed_headers, dict):
headers = parsed_headers
except Exception as e:
logging.warning(f"Invalid chaos report headers for project {project}: {e}")
if method not in ["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"]:
method = "GET"
with open("/tmp/kubeinvaders_url_debug.log", "w") as dbg:
dbg.write(f"--- {datetime.datetime.now().isoformat()} ---\n")
dbg.write(f" project = {project!r}\n")
dbg.write(f" chaos_program_key= {chaos_program_key!r}\n")
dbg.write(f" raw redis value = {r.get(chaos_program_key)!r}\n")
dbg.write(f" chaos_report_program = {chaos_report_program}\n")
dbg.write(f" url (raw) = {chaos_report_program.get('chaosReportCheckSiteURL')!r}\n")
dbg.write(f" url (stripped) = {url!r}\n")
dbg.write(f" method = {method!r}\n")
dbg.write(f" headers = {headers}\n")
dbg.write(f" payload = {payload!r}\n")
if not url:
logging.warning(
f"[k-inv][metrics_loop][chaos_report] Empty chaosReportCheckSiteURL for project={project}. Raw program: {chaos_report_program}"
)
response = "Connection Error"
else:
response = do_http_request(url, method, headers, payload)
check_url_counter_key = f"{chaos_report_program['chaosReportProject']}_check_url_counter"
check_url_status_code_key = f"{chaos_report_program['chaosReportProject']}_check_url_status_code"
check_url_elapsed_time_key = f"{chaos_report_program['chaosReportProject']}_check_url_elapsed_time"
check_url_start_time = f"{chaos_report_program['chaosReportProject']}_check_url_start_time"
if r.get(check_url_counter_key) == None:
r.set(check_url_counter_key, 0)
else:
r.incr(check_url_counter_key)
if r.get(check_url_start_time) == None:
r.set(check_url_start_time, now.strftime("%Y-%m-%d %H:%M:%S"))
if response == "Connection Error":
logging.info(f"[k-inv][metrics_loop][chaos_report] Connection Error while checking {chaos_report_program['chaosReportCheckSiteURL']}")
r.set(check_url_status_code_key, "Connection Error")
r.set(check_url_elapsed_time_key, 0)
else:
logging.info(f"[k-inv][metrics_loop][chaos_report] Status code {response.status_code} while checking {chaos_report_program['chaosReportCheckSiteURL']}")
r.set(check_url_status_code_key, response.status_code)
r.set(check_url_elapsed_time_key, float(response.elapsed.total_seconds()))
api_response_items = []
try:
label_selector="chaos-controller=kubeinvaders"
api_response = api_instance.list_pod_for_all_namespaces(label_selector=label_selector)
api_response_items = api_response.items or []
except ApiException as e:
logging.warning(f"[k-inv][metrics_loop] Kubernetes API error while listing pods: {e}")
except LocationValueError as e:
logging.warning(f"[k-inv][metrics_loop] Invalid Kubernetes host while listing pods: {e}")
api_instance, batch_api = rebuild_api_client_from_fallbacks(configuration)
except MaxRetryError as e:
logging.warning(f"[k-inv][metrics_loop] Kubernetes host unreachable while listing pods: {e}")
api_instance, batch_api = rebuild_api_client_from_fallbacks(configuration)
except Exception as e:
logging.warning(f"[k-inv][metrics_loop] Unexpected error while listing pods: {type(e).__name__}: {e}")
r.set("current_chaos_job_pod", 0)
for pod in api_response_items:
if pod.status.phase == "Pending" or pod.status.phase == "Running":
logging.debug(f"[k-inv][metrics_loop] Found pod {pod.metadata.name}. It is in {pod.status.phase} phase. Incrementing current_chaos_job_pod Redis key")
r.incr('current_chaos_job_pod')
if pod.status.phase != "Pending" and pod.status.phase != "Running" and not r.exists(f"pod:time:{pod.metadata.namespace}:{pod.metadata.name}"):
logging.debug(f"[k-inv][metrics_loop] Found pod {pod.metadata.name}. It is in {pod.status.phase} phase. Tracking time in pod:time:{pod.metadata.namespace}:{pod.metadata.name} Redis key")
r.set(f"pod:time:{pod.metadata.namespace}:{pod.metadata.name}", int(time.time()))
elif pod.status.phase != "Pending" and pod.status.phase != "Running" and r.exists(f"pod:time:{pod.metadata.namespace}:{pod.metadata.name}"):
logging.debug(f"[k-inv][metrics_loop] Found pod {pod.metadata.name}. It is in {pod.status.phase} phase. Comparing time in pod:time:{pod.metadata.namespace}:{pod.metadata.name} Redis key with now")
now = int(time.time())
pod_time = int(r.get(f"pod:time:{pod.metadata.namespace}:{pod.metadata.name}"))
logging.debug(f"[k-inv][metrics_loop] For {pod.metadata.name} comparing now:{now} with pod_time:{pod_time}")
if (now - pod_time > 120):
try:
api_instance.delete_namespaced_pod(pod.metadata.name, namespace = pod.metadata.namespace)
logging.debug(f"[k-inv][metrics_loop] Deleting pod {pod.metadata.name}")
r.delete(f"pod:time:{pod.metadata.namespace}:{pod.metadata.name}")
except ApiException as e:
logging.debug(e)
if pod.metadata.labels.get('chaos-codename') != None:
codename = pod.metadata.labels.get('chaos-codename')
job_name = pod.metadata.labels.get('job-name').replace("-","_")
exp_name = pod.metadata.labels.get('experiment-name')
if pod.status.phase in ["Pending", "Running", "Succeeded"]:
r.set(f"chaos_jobs_status:{codename}:{exp_name}:{job_name}", 1.0)
else:
r.set(f"chaos_jobs_status:{codename}:{exp_name}:{job_name}", -1)
r.set(f"chaos_jobs_pod_phase:{codename}:{exp_name}:{job_name}", pod.status.phase)
time.sleep(1)