This commit is contained in:
Eugenio Marzo
2026-03-14 19:14:44 +01:00
parent c036e6a30f
commit 5fb2d9cac6
9 changed files with 269 additions and 31 deletions

View File

@@ -4,6 +4,18 @@ import json
import logging
import os
import sys
logging.basicConfig(
level=logging.INFO,
stream=sys.stdout,
format="%(asctime)s %(levelname)s %(message)s",
force=True,
)
logging.getLogger().setLevel(logging.INFO)
_file_handler = logging.FileHandler("/tmp/kubeinvaders_metrics_loop.log")
_file_handler.setLevel(logging.INFO)
_file_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
logging.getLogger().addHandler(_file_handler)
from kubernetes import client, config
from kubernetes.client.rest import ApiException
import requests
@@ -13,10 +25,28 @@ import random
import redis
import time
import urllib3
from urllib.parse import urlparse
from urllib3.exceptions import LocationValueError
from urllib3.exceptions import MaxRetryError
import time
import datetime
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def normalize_k8s_host(raw_host):
if not raw_host:
return ""
host = str(raw_host).strip().strip('"').strip("'")
if not host:
return ""
if not host.startswith("http://") and not host.startswith("https://"):
host = "https://" + host
host = host.rstrip("/")
parsed = urlparse(host)
if not parsed.netloc:
return ""
return host
def join_string_to_array(project_list_string, separator):
return project_list_string.split(separator)
@@ -86,31 +116,112 @@ def create_job(job_name, pod_template):
r = redis.Redis(unix_socket_path='/tmp/redis.sock')
logging.basicConfig(
level=os.environ.get("LOGLEVEL", "INFO"),
stream=sys.stdout,
format="%(asctime)s %(levelname)s %(message)s",
force=True,
)
logging.getLogger('kubernetes').setLevel(logging.ERROR)
logging.debug('Starting script for KubeInvaders metrics loop')
configuration = client.Configuration()
token = os.environ["TOKEN"]
configuration.api_key = {"authorization": f"Bearer {token}"}
configuration.host = sys.argv[1]
token = os.environ.get("TOKEN", "")
if not token:
redis_token = r.get("x_k8s_token")
if redis_token:
token = redis_token.decode()
logging.info("[k-inv][metrics_loop] TOKEN not set in env, using X-K8S-Token from Redis")
token = token.strip()
if token.lower().startswith("bearer "):
token = token[7:].strip()
if not token:
logging.warning("[k-inv][metrics_loop] No token available (TOKEN env and Redis x_k8s_token both empty) — API calls may fail")
configuration.api_key = {"authorization": token}
configuration.api_key_prefix = {"authorization": "Bearer"}
arg_host = sys.argv[1] if len(sys.argv) > 1 else ""
resolved_host = normalize_k8s_host(arg_host)
if not resolved_host:
redis_endpoint = r.get("k8s_api_endpoint")
if redis_endpoint:
redis_endpoint = redis_endpoint.decode() if isinstance(redis_endpoint, bytes) else redis_endpoint
resolved_host = normalize_k8s_host(redis_endpoint)
if resolved_host:
logging.info(f"[k-inv][metrics_loop] Using Kubernetes host from Redis k8s_api_endpoint: {resolved_host}")
if not resolved_host:
resolved_host = normalize_k8s_host(os.environ.get("ENDPOINT", ""))
if resolved_host:
logging.info(f"[k-inv][metrics_loop] Using Kubernetes host from ENDPOINT: {resolved_host}")
if not resolved_host:
svc_host = os.environ.get("KUBERNETES_SERVICE_HOST", "")
svc_port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS", "443")
resolved_host = normalize_k8s_host(f"https://{svc_host}:{svc_port}" if svc_host else "")
if resolved_host:
logging.info(f"[k-inv][metrics_loop] Using in-cluster Kubernetes host: {resolved_host}")
if not resolved_host:
resolved_host = "https://kubernetes.default.svc"
logging.warning("[k-inv][metrics_loop] No valid Kubernetes host provided; defaulting to https://kubernetes.default.svc")
configuration.host = resolved_host
logging.info(f"[k-inv][metrics_loop] Kubernetes API host resolved to: {configuration.host}")
configuration.insecure_skip_tls_verify = True
configuration.verify_ssl = False
client.Configuration.set_default(configuration)
client.Configuration.set_default(configuration)
api_client = client.ApiClient(configuration=configuration)
api_instance = client.CoreV1Api(api_client)
batch_api = client.BatchV1Api(api_client)
api_instance = client.CoreV1Api()
batch_api = client.BatchV1Api()
def rebuild_api_client_from_fallbacks(current_configuration):
fallback_host = ""
redis_endpoint = r.get("k8s_api_endpoint")
if redis_endpoint:
redis_endpoint = redis_endpoint.decode() if isinstance(redis_endpoint, bytes) else redis_endpoint
fallback_host = normalize_k8s_host(redis_endpoint)
if not fallback_host:
fallback_host = normalize_k8s_host(os.environ.get("ENDPOINT", ""))
if not fallback_host:
svc_host = os.environ.get("KUBERNETES_SERVICE_HOST", "")
svc_port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS", "443")
fallback_host = normalize_k8s_host(f"https://{svc_host}:{svc_port}" if svc_host else "")
if not fallback_host:
fallback_host = "https://kubernetes.default.svc"
current_configuration.host = fallback_host
logging.warning(f"[k-inv][metrics_loop] Rebuilding Kubernetes API client with host: {fallback_host}")
new_api_client = client.ApiClient(configuration=current_configuration)
return client.CoreV1Api(new_api_client), client.BatchV1Api(new_api_client)
def sync_api_client_from_redis(current_configuration):
redis_endpoint = r.get("k8s_api_endpoint")
if not redis_endpoint:
return None, None
redis_endpoint = redis_endpoint.decode() if isinstance(redis_endpoint, bytes) else redis_endpoint
redis_host = normalize_k8s_host(redis_endpoint)
if not redis_host:
return None, None
if current_configuration.host != redis_host:
current_configuration.host = redis_host
logging.info(f"[k-inv][metrics_loop] Switching Kubernetes API host from Redis k8s_api_endpoint to: {redis_host}")
new_api_client = client.ApiClient(configuration=current_configuration)
return client.CoreV1Api(new_api_client), client.BatchV1Api(new_api_client)
return None, None
while True:
maybe_api_instance, maybe_batch_api = sync_api_client_from_redis(configuration)
if maybe_api_instance and maybe_batch_api:
api_instance, batch_api = maybe_api_instance, maybe_batch_api
#logging.info(f"[k-inv][metrics_loop] Metrics loop is active - {r.exists('chaos_report_project_list')}")
if r.exists('chaos_report_project_list'):
logging.info("[k-inv][metrics_loop] Found chaos_report_project_list Redis Key")
@@ -145,6 +256,18 @@ while True:
if method not in ["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"]:
method = "GET"
with open("/tmp/kubeinvaders_url_debug.log", "w") as dbg:
dbg.write(f"--- {datetime.datetime.now().isoformat()} ---\n")
dbg.write(f" project = {project!r}\n")
dbg.write(f" chaos_program_key= {chaos_program_key!r}\n")
dbg.write(f" raw redis value = {r.get(chaos_program_key)!r}\n")
dbg.write(f" chaos_report_program = {chaos_report_program}\n")
dbg.write(f" url (raw) = {chaos_report_program.get('chaosReportCheckSiteURL')!r}\n")
dbg.write(f" url (stripped) = {url!r}\n")
dbg.write(f" method = {method!r}\n")
dbg.write(f" headers = {headers}\n")
dbg.write(f" payload = {payload!r}\n")
if not url:
logging.warning(
f"[k-inv][metrics_loop][chaos_report] Empty chaosReportCheckSiteURL for project={project}. Raw program: {chaos_report_program}"
@@ -152,6 +275,7 @@ while True:
response = "Connection Error"
else:
response = do_http_request(url, method, headers, payload)
check_url_counter_key = f"{chaos_report_program['chaosReportProject']}_check_url_counter"
check_url_status_code_key = f"{chaos_report_program['chaosReportProject']}_check_url_status_code"
check_url_elapsed_time_key = f"{chaos_report_program['chaosReportProject']}_check_url_elapsed_time"
@@ -173,15 +297,25 @@ while True:
logging.info(f"[k-inv][metrics_loop][chaos_report] Status code {response.status_code} while checking {chaos_report_program['chaosReportCheckSiteURL']}")
r.set(check_url_status_code_key, response.status_code)
r.set(check_url_elapsed_time_key, float(response.elapsed.total_seconds()))
api_response_items = []
try:
label_selector="chaos-controller=kubeinvaders"
api_response = api_instance.list_pod_for_all_namespaces(label_selector=label_selector)
api_response_items = api_response.items or []
except ApiException as e:
logging.debug(e)
logging.warning(f"[k-inv][metrics_loop] Kubernetes API error while listing pods: {e}")
except LocationValueError as e:
logging.warning(f"[k-inv][metrics_loop] Invalid Kubernetes host while listing pods: {e}")
api_instance, batch_api = rebuild_api_client_from_fallbacks(configuration)
except MaxRetryError as e:
logging.warning(f"[k-inv][metrics_loop] Kubernetes host unreachable while listing pods: {e}")
api_instance, batch_api = rebuild_api_client_from_fallbacks(configuration)
except Exception as e:
logging.warning(f"[k-inv][metrics_loop] Unexpected error while listing pods: {type(e).__name__}: {e}")
r.set("current_chaos_job_pod", 0)
for pod in api_response.items:
for pod in api_response_items:
if pod.status.phase == "Pending" or pod.status.phase == "Running":
logging.debug(f"[k-inv][metrics_loop] Found pod {pod.metadata.name}. It is in {pod.status.phase} phase. Incrementing current_chaos_job_pod Redis key")
r.incr('current_chaos_job_pod')

View File

@@ -10,13 +10,36 @@ fi
export PYTHONUNBUFFERED=1
if python3 -c "import urllib.request; urllib.request.urlopen('https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}', timeout=5)" 2>/dev/null; then
python3 -u /opt/metrics_loop/start.py https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS} &
while true
do
pgrep -a -f -c "^python3.*metrics_loop.*$" > /dev/null || ( python3 -u /opt/metrics_loop/start.py https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS} & )
sleep 2
done
RESOLVED_ENDPOINT=""
# 1) Endpoint configured from the web console (persisted by /kube/healthz in Redis)
REDIS_ENDPOINT=$(python3 - <<'PY'
import redis
try:
r = redis.Redis(unix_socket_path='/tmp/redis.sock', decode_responses=True)
v = r.get('k8s_api_endpoint')
print(v.strip() if v else '')
except Exception:
print('')
PY
)
if [ ! -z "$REDIS_ENDPOINT" ]; then
RESOLVED_ENDPOINT="$REDIS_ENDPOINT"
echo "Using k8s_api_endpoint from Redis: $RESOLVED_ENDPOINT"
elif [ ! -z "$ENDPOINT" ]; then
RESOLVED_ENDPOINT="$ENDPOINT"
echo "Using ENDPOINT env var: $RESOLVED_ENDPOINT"
else
RESOLVED_ENDPOINT="https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}"
echo "Using in-cluster endpoint: $RESOLVED_ENDPOINT"
fi
python3 -u /opt/metrics_loop/start.py "$RESOLVED_ENDPOINT" &
while true
do
pgrep -a -f -c "^python3.*metrics_loop.*$" > /dev/null || ( python3 -u /opt/metrics_loop/start.py "$RESOLVED_ENDPOINT" & )
sleep 2
done