from asyncio.log import logger import yaml import logging import os import pathlib import sys import json from kubernetes import client, config from kubernetes.client.rest import ApiException import requests from string import Template import string import random import redis import time import re from hashlib import sha256 import time import urllib3 import datetime urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) def create_pod_list(logid, api_responses, current_regex): webtail_pods = [] json_re = json.loads(current_regex) regexsha = sha256(current_regex.encode('utf-8')).hexdigest() pod_re = json_re["pod"] namespace_re = json_re["namespace"] annotations_re = json_re["annotations"] labels_re = json_re["labels"] containers_re = json_re["containers"] for api_response in api_responses: pods_pending = 0 pods_running = 0 pods_succeeded = 0 for pod in api_response.items: if pod.status.phase == "Pending": pods_pending = pods_pending + 1 if pod.status.phase == "Running": pods_running = pods_running + 1 if pod.status.phase == "Succeeded": pods_succeeded = pods_succeeded + 1 r.set(f"log_status:{logid}", f"Pods on Pending phase: {pods_pending}\nPods on Succeeded phase: {pods_succeeded}\nPods on Running phase: {pods_running}") if r.exists(f"regex_cmp:{regexsha}:{logid}:{pod.metadata.namespace}:{pod.metadata.name}"): cached_regex_match = r.get(f"regex_cmp:{regexsha}:{logid}:{pod.metadata.namespace}:{pod.metadata.name}") if cached_regex_match == "maching": webtail_pods.append(pod) regex_match_info = f"[logid:{logid}][k-inv][logs-loop] Taking logs of {pod.metadata.name}. Redis has cached that {current_regex} is good for {pod.metadata.name}" logging.debug(f"[k-inv][regexmatch][logid:{logid}][{cached_regex_match}] IS CACHED IN REDIS") else: regex_match_info = f"[logs-loop][logid:{logid}] Skipping logs of {pod.metadata.name}. Redis has cached that {current_regex} is not good for {pod.metadata.name}" logging.debug(regex_match_info) logging.debug(f"[k-inv][regexmatch][logid:{logid}][{cached_regex_match}] IS CACHED IN REDIS") else: if re.search(r"{pod_re}", pod.metadata.name) or re.search(r"{pod_re}", pod.metadata.name) or (pod_re == pod.metadata.name) or (pod_re == pod.metadata.name): logging.debug(f"[logid:{logid}][k-in][regexmatch] |{pod_re}| |{pod.metadata.name}| MATCHED") regex_key_name = f"regex_cmp:{regexsha}:{logid}:{pod.metadata.namespace}:{pod.metadata.name}" if re.search(f"{namespace_re}", pod.metadata.namespace) or re.search(r"{namespace_re}", pod.metadata.namespace) or (namespace_re == pod.metadata.namespace): logging.debug(f"[logid:{logid}][k-inv][regexmatch] |{namespace_re}| |{pod.metadata.namespace}| MATCHED") if re.search(f"{labels_re}", str(pod.metadata.labels)) or re.search(r"{labels_re}", str(pod.metadata.labels)): logging.debug(f"[logid:{logid}][k-inv][regexmatch] |{labels_re}| |{str(pod.metadata.labels)}| MATCHED") if re.search(f"{annotations_re}", str(pod.metadata.annotations)) or re.search(r"{annotations_re}", str(pod.metadata.annotations)): logging.debug(f"[logid:{logid}][k-inv][regexmatch] |{annotations_re}| |{str(pod.metadata.annotations)}| MATCHED") webtail_pods.append(pod) regex_match_info = f"[logid:{logid}] Taking logs from {pod.metadata.name}. It is compliant with the Regex {current_regex}" r.set(regex_key_name, "maching") logging.debug(regex_match_info) else: logging.debug(f"[logid:{logid}][k-inv][regexmatch] |{annotations_re}| |{str(pod.metadata.annotations)}| FAILED") r.set(regex_key_name, "not_maching") else: logging.debug(f"[logid:{logid}][k-inv][regexmatch] |{labels_re}| |{str(pod.metadata.labels)}| FAILED") r.set(regex_key_name, "not_maching") else: logging.debug(f"[logid:{logid}][k-inv][regexmatch] |{namespace_re}| |{pod.metadata.namespace}| FAILED") r.set(regex_key_name, "not_maching") else: logging.debug(f"[logid:{logid}][k-inv][regexmatch] |{pod_re}| |{pod.metadata.name}| FAILED") #r.set(regex_key_name, "not_maching") return webtail_pods def log_cleaner(logid): if not r.exists(f"do_not_clean_log:{logid}"): for key in r.scan_iter(f"log_time:{logid}:*"): r.delete(key) for key in r.scan_iter(f"log:{logid}:*"): r.delete(key) r.set(f"do_not_clean_log:{logid}", "1") r.expire(f"do_not_clean_log:{logid}", 60) r.set(f"log_status:{logid}", f"[k-inv][logs-loop] Logs (id: {logid}) has been cleaned") def get_regex(logid): if not r.exists(f"log_pod_regex:{logid}"): r.set(f"log_status:{logid}", f"[k-inv][logs-loop] ERROR. Regex has not been configured or is invalid") return r.get(f"log_pod_regex:{logid}") def compute_line(api_response_line, container): logging.debug(f"[logid:{logid}] API Response: ||{api_response_line}||") logrow = f"""

Namespace
Pod
Container
{pod.metadata.namespace}
{pod.metadata.name}
{container}
{api_response_line}
""" sha256log = sha256(logrow.encode('utf-8')).hexdigest() if not r.exists(f"log:{logid}:{pod.metadata.name}:{container}:{sha256log}"): logging.debug(f"[logid:{logid}][k-inv][logs-loop] The key log:{logid}:{pod.metadata.name}:{container}:{sha256log} does not exists. Preparing to store log content") if not r.exists(f"logs:chaoslogs-{logid}-count"): r.set(f"logs:chaoslogs-{logid}-count", 0) r.set(f"logs:chaoslogs-{logid}-0", logrow) else: latest_chaos_logs_count = int(r.get(f"logs:chaoslogs-{logid}-count")) latest_chaos_logs_count = latest_chaos_logs_count + 1 r.set(f"logs:chaoslogs-{logid}-{latest_chaos_logs_count}", logrow) r.set(f"logs:chaoslogs-{logid}-count", latest_chaos_logs_count) r.set(f"log:{logid}:{pod.metadata.name}:{container}:{sha256log}", logrow) r.set(f"log_time:{logid}:{pod.metadata.name}:{container}", time.time()) r.expire(f"log:{logid}:{pod.metadata.name}:{container}:{sha256log}", 60) logging.basicConfig(level=os.environ.get("LOGLEVEL", "DEBUG")) logging.getLogger('kubernetes').setLevel(logging.ERROR) logging.debug('Starting script for KubeInvaders taking logs from pods...') file = pathlib.Path('/tmp/redis.sock') if file.exists(): r = redis.Redis(unix_socket_path='/tmp/redis.sock', charset="utf-8", decode_responses=True) else: r = redis.Redis("127.0.0.1", charset="utf-8", decode_responses=True) if os.environ.get("DEV"): logging.debug("Setting env var for dev...") r.set("log_pod_regex", '{"since": 60, "pod":".*", "namespace":"namespace1", "labels":".*", "annotations":".*", "containers": ".*"}') r.set("logs_enabled:aaaa", 1) r.expire("logs_enabled:aaaa", 10) r.set("programming_mode", 0) logging.debug(r.get("log_pod_regex:aaaa")) logging.debug(r.get("logs_enabled:aaaa")) configuration = client.Configuration() token = os.environ["TOKEN"] configuration.api_key = {"authorization": f"Bearer {token}"} configuration.host = sys.argv[1] configuration.insecure_skip_tls_verify = True configuration.verify_ssl = False client.Configuration.set_default(configuration) client.Configuration.set_default(configuration) api_instance = client.CoreV1Api() batch_api = client.BatchV1Api() namespace = "kubeinvaders" while True: for key in r.scan_iter("logs_enabled:*"): if r.get(key) == "1": logid = key.split(":")[1] logging.debug(f"Found key {key} and it is enabled.") webtail_pods = [] current_regex = get_regex(logid) if not current_regex: continue else: logging.debug(f"[k-inv][logs-loop] {key} is using this regex: {current_regex}") logging.debug(f"[logid:{logid}] Checking do_not_clean_log Redis key") log_cleaner(logid) try: json_re = json.loads(current_regex) namespace_re = json_re["namespace"] logging.debug(f"[logid:{logid}][k-inv][logs-loop] Taking list of namespaces") namespaces_list = api_instance.list_namespace() api_responses = [] for namespace in namespaces_list.items: logging.debug(f"[logid:{logid}][k-inv][logs-loop] Found namespace {namespace.metadata.name}") if re.search(f"{namespace_re}", namespace.metadata.name): logging.debug(f"[logid:{logid}][k-inv][logs-loop][NAMESPACE-MATCHING] {namespace.metadata.name}") logging.debug(f"[logid:{logid}][k-inv][logs-loop] Taking pods from namespace {namespace.metadata.name}") api_responses.append(api_instance.list_namespaced_pod(namespace.metadata.name)) except ApiException as e: logging.debug(e) webtail_pods = create_pod_list(logid, api_responses, current_regex) json_re = json.loads(current_regex) containers_re = json_re["containers"] webtail_pods_len = len(webtail_pods) if not r.exists(f"logs:chaoslogs-{logid}-count"): old_logs = r.get(f"logs:chaoslogs-{logid}-0") else: latest_chaos_logs_count = int(r.get(f"logs:chaoslogs-{logid}-count")) old_logs = r.get(f"logs:chaoslogs-{logid}-{latest_chaos_logs_count}") pod_start_time_sec = int(json_re["pod_start_time_sec"]) pod_logs_sec = int(json_re["pod_logs_sec"]) r.set(f"logs:webtail_pods_len:{logid}", webtail_pods_len) r.set(f"pods_match_regex:{logid}", webtail_pods_len) logging.debug(f"[logid:{logid}][k-inv][logs-loop] Current Regex: {current_regex}") for pod in webtail_pods: if pod.status.phase == "Unknown" and pod.status.phase == "Pending": continue logging.debug(f"[logid:{logid}][k-inv][logs-loop] Taking logs from {pod.metadata.name}") container_list = [] for container in pod.spec.containers: container_list.append(container.name) if len(container_list) == 1: only_one_container = True else: only_one_container = False for container in container_list: logging.debug(f"[logid:{logid}][k-inv][logs-loop] Listing containers of {pod.metadata.name}. Computing {container} phase: {pod.status.phase}") if pod.status.phase != "Unknown" and pod.status.phase != "Pending": logging.debug(f"[logid:{logid}][k-inv][logs-loop] Container {container} on pod {pod.metadata.name} has accepted phase for taking logs") try: if r.exists(f"log_time:{logid}:{pod.metadata.name}:{container}"): latest_log_tail_time = float(r.get(f"log_time:{logid}:{pod.metadata.name}:{container}")) since = int(time.time() - float(latest_log_tail_time)) + 1 else: latest_log_tail_time = time.time() pod_start_time = int(datetime.datetime.timestamp(pod.status.start_time)) logging.debug(f"[logid:{logid}][k-inv][logs-loop] POD's start time {pod_start_time}") since = int(time.time() - pod_logs_sec) + 1 logging.debug(f"[logid:{logid}][k-inv][logs-loop] pod_start_time_sec is {pod_start_time_sec}") if since > pod_start_time_sec: continue logging.debug(f"[logid:{logid}][k-inv][logs-loop] Time types: {type(latest_log_tail_time)} {type(time.time())} {type(since)} since={since}") logging.debug(f"[logid:{logid}][k-inv][logs-loop] Calling K8s API for reading logs of {pod.metadata.name} container {container} in namespace {pod.metadata.namespace} since {since} seconds - phase {pod.status.phase}") if only_one_container: api_response = api_instance.read_namespaced_pod_log(name=pod.metadata.name, namespace=pod.metadata.namespace, since_seconds=since) else: api_response = api_instance.read_namespaced_pod_log(name=pod.metadata.name, namespace=pod.metadata.namespace, since_seconds=since, container=container) logging.debug(f"[logid:{logid}][k-inv][logs-loop] Computing K8s API response for reading logs of {pod.metadata.name} in namespace {pod.metadata.namespace} - phase {pod.status.phase}") logging.debug(f"[logid:{logid}][k-inv][logs-loop] {type(api_response)} {api_response}") r.set(f"log_time:{logid}:{pod.metadata.name}:{container}", time.time()) if not re.search(r'[\w]+', api_response): logging.debug(f"[logid:{logid}][k-inv][logs-loop] API Response for reading logs of {pod.metadata.name} in namespace {pod.metadata.namespace} is still empty") continue compute_line(api_response, container) logs = "" if type(api_response) is list: for api_response_line in api_response: logging.debug(f"[logid:{logid}][k-inv][logs-loop] Computing log line {api_response_line}") #logs = f"{logs}
{api_response_line}" compute_line(logs, container) else: for api_response_line in api_response.splitlines(): logging.debug(f"[logid:{logid}][k-inv][logs-loop] Computing log line {api_response_line}") compute_line(logs, container) #logs = f"{logs}
{api_response_line}" except ApiException as e: logging.debug(f"[k-inv][logs-loop] EXCEPTION {e}") time.sleep(1)