mirror of
https://github.com/Danny-de-bree/fullstaq-operator.git
synced 2026-02-14 17:39:50 +00:00
120 lines
4.5 KiB
Python
120 lines
4.5 KiB
Python
import kopf
|
|
from kubernetes import client, config, watch
|
|
from config import Config
|
|
import requests
|
|
from kubernetes.client import CoreV1Api
|
|
from kubernetes.client.models.v1_pod import V1Pod
|
|
from kubernetes.client.models.v1_container_status import V1ContainerStatus
|
|
|
|
# Set up logging
|
|
logger = Config.setup_logging()
|
|
|
|
cluster = Config.cluster()
|
|
|
|
def kubeconfig() -> client.CoreV1Api:
|
|
"""This is for using the kubeconfig to auth with the k8s api
|
|
with the first try it will try to use the in-cluster config (so for in cluster use)
|
|
If it cannot find an incluster because it is running locally, it will use your local config"""
|
|
try:
|
|
# Try to load the in-cluster configuration
|
|
config.load_incluster_config()
|
|
logger.info("Loaded in-cluster configuration.")
|
|
except config.ConfigException:
|
|
# If that fails, fall back to kubeconfig file
|
|
config.load_kube_config(context=cluster)
|
|
logger.info(f"Loaded kubeconfig file with context {cluster}.")
|
|
|
|
# Check the active context
|
|
_, active_context = config.list_kube_config_contexts()
|
|
if active_context:
|
|
logger.info(f"The active context is {active_context['name']}.")
|
|
else:
|
|
logger.info("No active context.")
|
|
|
|
# Now you can use the client
|
|
api = client.CoreV1Api()
|
|
return api
|
|
|
|
def watch_namespace_pods(v1: CoreV1Api, namespace: str) -> None:
|
|
"""Watch pods in a specified namespace"""
|
|
logger.info(f"Watching namespace {namespace} on cluster {v1.api_client.configuration.host}")
|
|
w = watch.Watch()
|
|
for event in w.stream(v1.list_namespaced_pod, namespace=namespace, timeout_seconds=10):
|
|
process_pod_event(event)
|
|
|
|
|
|
def watch_labeled_pods(v1: CoreV1Api, label_selector: str) -> None:
|
|
"""Watch pods with a specific label across all namespaces"""
|
|
logger.info(f"Watching all pods on cluster {v1.api_client.configuration.host} with label {label_selector}")
|
|
w = watch.Watch()
|
|
for event in w.stream(v1.list_pod_for_all_namespaces, label_selector=label_selector, timeout_seconds=10):
|
|
process_pod_event(event)
|
|
|
|
|
|
def watch_all_pods(v1: CoreV1Api) -> None:
|
|
"""Watch all pods in all namespaces"""
|
|
logger.info(f"Watching all pods on cluster {v1.api_client.configuration.host}")
|
|
w = watch.Watch()
|
|
for event in w.stream(v1.list_pod_for_all_namespaces, timeout_seconds=10):
|
|
process_pod_event(event)
|
|
|
|
|
|
def process_pod_event(event: dict) -> None:
|
|
"""Process pod events and take necessary actions"""
|
|
if event['type'] not in ('ADDED', 'MODIFIED'):
|
|
return
|
|
|
|
pod = event['object']
|
|
logger.info(f"Event: {event['type']} {pod.kind} {pod.metadata.name} {pod.status.phase}")
|
|
|
|
if pod.status.phase in ("Failed", "Unknown"):
|
|
handle_failed_pod(pod)
|
|
|
|
for container_status in pod.status.container_statuses:
|
|
if container_status.state.waiting:
|
|
handle_failed_container(container_status, pod)
|
|
|
|
def handle_failed_pod(pod: V1Pod) -> None:
|
|
"""Handle pods in a failed state"""
|
|
logger.info("Pod is not running")
|
|
logger.info("Sending webhook to teams")
|
|
data = {"text": f"Hello! The {pod.kind} {pod.metadata.name} is in the {pod.status.phase} phase. But remember Feyenoord 1#"}
|
|
requests.post(Config.teams_url, json=data)
|
|
|
|
def handle_failed_container(container_status: V1ContainerStatus, pod: V1Pod) -> None:
|
|
"""Handle containers in a waiting state"""
|
|
error_states = ["ImagePullBackOff", "ErrImagePull", "CrashLoopBackOff"]
|
|
if container_status.state.waiting.reason in error_states:
|
|
logger.info(f"Container {container_status.name} is in a faulty state")
|
|
logger.info("Sending webhook to teams")
|
|
data = {"text": f"Hello! The container {container_status.name} in {pod.kind} {pod.metadata.name} is in the {container_status.state.waiting.reason} state. But remember Feyenoord 1#"}
|
|
requests.post(Config.teams_url, json=data)
|
|
|
|
def watch_pods_by_config(v1: CoreV1Api) -> None:
|
|
"""Watch pods based on configuration"""
|
|
if Config.namespace:
|
|
watch_namespace_pods(v1, Config.namespace)
|
|
elif Config.label:
|
|
watch_labeled_pods(v1, Config.label)
|
|
else:
|
|
watch_all_pods(v1)
|
|
|
|
@kopf.on.startup()
|
|
def on_startup(**kwargs):
|
|
"""On startup, watch pods"""
|
|
logger.info("Starting up")
|
|
v1 = kubeconfig()
|
|
watch_pods_by_config(v1)
|
|
|
|
@kopf.on.timer('pods',interval=Config.interval)
|
|
def on_timer(**kwargs):
|
|
"""On timer, watch pods"""
|
|
logger.info("Starting up")
|
|
v1 = kubeconfig()
|
|
watch_pods_by_config(v1)
|
|
|
|
def main():
|
|
kopf.run()
|
|
|
|
if __name__ == '__main__':
|
|
main() |