mirror of
https://github.com/aquasecurity/kube-hunter.git
synced 2026-05-06 01:08:11 +00:00
Improvements to the API Server hunters
This commit is contained in:
@@ -9,7 +9,12 @@ from ...core.events.types import OpenPortEvent, Service, Event
|
||||
class ApiServer(Service, Event):
|
||||
"""The API server is in charge of all operations on the cluster."""
|
||||
def __init__(self):
|
||||
Service.__init__(self, name="API Server")
|
||||
Service.__init__(self, name="Accessing API Server")
|
||||
|
||||
class ApiServerWithServiceAccountToken(Service, Event):
|
||||
"""The API server is in charge of all operations on the cluster."""
|
||||
def __init__(self):
|
||||
Service.__init__(self, name="Accessing API Server using service account token")
|
||||
|
||||
|
||||
# Other devices could have this port open, but we can check to see if it looks like a Kubernetes node
|
||||
@@ -28,3 +33,6 @@ class ApiServerDiscovery(Hunter):
|
||||
if '"code"' in main_request:
|
||||
self.event.role = "Master"
|
||||
self.publish_event(ApiServer())
|
||||
|
||||
if self.event.auth_token:
|
||||
self.publish_event(ApiServerWithServiceAccountToken())
|
||||
|
||||
@@ -2,39 +2,31 @@ import logging
|
||||
import json
|
||||
import requests
|
||||
import uuid
|
||||
import copy
|
||||
|
||||
from ...core.events import handler
|
||||
from ...core.events.types import Vulnerability, Event
|
||||
from ..discovery.apiserver import ApiServer
|
||||
from ...core.types import Hunter, ActiveHunter, KubernetesCluster, RemoteCodeExec, AccessRisk, InformationDisclosure
|
||||
from ..discovery.apiserver import ApiServer, ApiServerWithServiceAccountToken
|
||||
from ...core.types import Hunter, ActiveHunter, KubernetesCluster
|
||||
from ...core.types import RemoteCodeExec, AccessRisk, InformationDisclosure, UnauthenticatedAccess
|
||||
|
||||
|
||||
""" Vulnerabilities """
|
||||
|
||||
|
||||
class ServerApiAccess(Vulnerability, Event):
|
||||
""" Accessing the server API within a compromised pod would help an attacker gain full control over the cluster"""
|
||||
|
||||
def __init__(self, evidence):
|
||||
Vulnerability.__init__(self, KubernetesCluster, name="Access to server API", category=RemoteCodeExec)
|
||||
""" The API Server port is accessible. Depending on your RBAC settings this could expose access to or control of your cluster. """
|
||||
|
||||
def __init__(self, evidence, category):
|
||||
Vulnerability.__init__(self, KubernetesCluster, name="Access to server API", category=category)
|
||||
self.evidence = evidence
|
||||
|
||||
|
||||
class ListPodUnderDefaultNamespace(Vulnerability, Event):
|
||||
""" Accessing the pods list under default namespace might give an attacker valuable
|
||||
information to harm the cluster """
|
||||
|
||||
def __init__(self, evidence):
|
||||
Vulnerability.__init__(self, KubernetesCluster, name="Listing pods list under default namespace",
|
||||
category=InformationDisclosure)
|
||||
self.evidence = evidence
|
||||
|
||||
|
||||
class ListPodUnderAllNamespaces(Vulnerability, Event):
|
||||
class ListPodsAndNamespaces(Vulnerability, Event):
|
||||
""" Accessing the pods list under ALL of the namespaces might give an attacker valuable information"""
|
||||
|
||||
def __init__(self, evidence):
|
||||
Vulnerability.__init__(self, KubernetesCluster, name="Listing pods list under ALL namespaces",
|
||||
Vulnerability.__init__(self, KubernetesCluster, name="Listing pods",
|
||||
category=InformationDisclosure)
|
||||
self.evidence = evidence
|
||||
|
||||
@@ -193,148 +185,130 @@ class DeleteAPod(Vulnerability, Event):
|
||||
|
||||
|
||||
class ApiServerPassiveHunterFinished(Event):
|
||||
def __init__(self, all_namespaces_names):
|
||||
self.all_namespaces_names = all_namespaces_names
|
||||
|
||||
def __str__(self):
|
||||
return str(self.event.auth_token)
|
||||
def __init__(self, namespaces):
|
||||
self.namespaces = namespaces
|
||||
|
||||
|
||||
# Passive Hunter
|
||||
@handler.subscribe(ApiServer)
|
||||
class AccessApiServerViaServiceAccountToken(Hunter):
|
||||
class AccessApiServer(Hunter):
|
||||
""" API Server Hunter
|
||||
Accessing the API server within a compromised pod might grant an attacker full control over the cluster
|
||||
"""
|
||||
|
||||
def __init__(self, event):
|
||||
self.event = event
|
||||
self.headers = dict()
|
||||
self.path = "https://{}:{}".format(self.event.host, self.event.port)
|
||||
|
||||
self.api_server_evidence = ''
|
||||
self.pod_list_under_default_namespace_evidence = ''
|
||||
self.pod_list_under_all_namespaces_evidence = ''
|
||||
|
||||
self.all_namespaces_names_evidence = list()
|
||||
self.all_roles_names_evidence = list()
|
||||
self.roles_names_under_default_namespace_evidence = list()
|
||||
self.all_cluster_roles_names_evidence = list()
|
||||
self.namespaces_and_their_pod_names = list(dict())
|
||||
self.headers = {}
|
||||
self.category = UnauthenticatedAccess
|
||||
|
||||
def access_api_server(self):
|
||||
logging.debug(self.event.host)
|
||||
logging.debug('Passive Hunter is attempting to access the API server using the pod\'s service account token')
|
||||
logging.debug('Passive Hunter is attempting to access the API at {host}:{port}'.format(host=self.event.host,
|
||||
port=self.event.port))
|
||||
try:
|
||||
res = requests.get("{path}/api".format(path=self.path),
|
||||
headers=self.headers, verify=False)
|
||||
self.api_server_evidence = res.content
|
||||
return res.status_code == 200 and res.content != ''
|
||||
r = requests.get("{path}/api".format(path=self.path), headers=self.headers, verify=False)
|
||||
if r.status_code == 200 and r.content != '':
|
||||
return r.content
|
||||
except requests.exceptions.ConnectionError:
|
||||
return False
|
||||
pass
|
||||
return False
|
||||
|
||||
def add_token_to_headers(self):
|
||||
if self.event.auth_token != '':
|
||||
self.headers = {'Authorization': 'Bearer ' + self.event.auth_token}
|
||||
|
||||
# 1 Pods Method:
|
||||
def get_pods_list_under_requested_scope(self, scope=None):
|
||||
try:
|
||||
res = requests.get("{path}/api/v1/{scope}/pods".format(path=self.path, scope=scope),
|
||||
headers=self.headers, verify=False)
|
||||
|
||||
parsed_response_content = json.loads(res.content)
|
||||
for item in parsed_response_content["items"]:
|
||||
name = item["metadata"]["name"].encode('ascii', 'ignore')
|
||||
namespace = item["metadata"]["namespace"].encode('ascii', 'ignore')
|
||||
self.namespaces_and_their_pod_names.append({'name': name, 'namespace': namespace})
|
||||
|
||||
return res.status_code == 200
|
||||
def get_items(self, path):
|
||||
try:
|
||||
items = []
|
||||
r = requests.get(path, headers=self.headers, verify=False)
|
||||
if r.status_code ==200:
|
||||
resp = json.loads(r.content)
|
||||
for item in resp["items"]:
|
||||
items.append(item["metadata"]["name"])
|
||||
return items
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def get_pods(self, namespace=None):
|
||||
pods = []
|
||||
try:
|
||||
if namespace is None:
|
||||
r = requests.get("{path}/api/v1/pods".format(path=self.path),
|
||||
headers=self.headers, verify=False)
|
||||
else:
|
||||
r = requests.get("{path}/api/v1/namespaces/{namespace}/pods".format(path=self.path),
|
||||
headers=self.headers, verify=False)
|
||||
if r.status_code == 200:
|
||||
resp = json.loads(r.content)
|
||||
for item in resp["items"]:
|
||||
name = item["metadata"]["name"].encode('ascii', 'ignore')
|
||||
namespace = item["metadata"]["namespace"].encode('ascii', 'ignore')
|
||||
pods.append({'name': name, 'namespace': namespace})
|
||||
|
||||
return pods
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
pass
|
||||
return None
|
||||
|
||||
# 1 Namespace method:
|
||||
def get_all_namespaces(self):
|
||||
try:
|
||||
res = requests.get("{path}/api/v1/namespaces".format(path=self.path),
|
||||
headers=self.headers,
|
||||
verify=False)
|
||||
return self.get_items("{path}/api/v1/namespaces".format(path=self.path))
|
||||
|
||||
parsed_response_content = json.loads(res.content)
|
||||
for item in parsed_response_content["items"]:
|
||||
self.all_namespaces_names_evidence.append(item["metadata"]["name"].encode('ascii', 'ignore'))
|
||||
return res.status_code == 200
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
|
||||
# 3 Roles & Cluster Roles Methods:
|
||||
def get_roles_under_default_namespace(self):
|
||||
try:
|
||||
res = requests.get("{path}/apis/rbac.authorization.k8s.io/v1/namespaces/default/roles".format(
|
||||
path=self.path),
|
||||
headers=self.headers, verify=False)
|
||||
parsed_response_content = json.loads(res.content)
|
||||
for item in parsed_response_content["items"]:
|
||||
self.roles_names_under_default_namespace_evidence.append(item["metadata"]["name"].encode('ascii', 'ignore'))
|
||||
return res.content if res.status_code == 200 else False
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
return self.get_items("{path}/apis/rbac.authorization.k8s.io/v1/namespaces/default/roles".format(path=self.path))
|
||||
|
||||
def get_all_cluster_roles(self):
|
||||
try:
|
||||
res = requests.get("{path}/apis/rbac.authorization.k8s.io/v1/clusterroles".format(
|
||||
path=self.path),
|
||||
headers=self.headers, verify=False)
|
||||
parsed_response_content = json.loads(res.content)
|
||||
for item in parsed_response_content["items"]:
|
||||
self.all_cluster_roles_names_evidence.append(item["metadata"]["name"].encode('ascii', 'ignore'))
|
||||
return res.content if res.status_code == 200 else False
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
return self.get_items("{path}/apis/rbac.authorization.k8s.io/v1/clusterroles".format(path=self.path))
|
||||
|
||||
def get_all_roles(self):
|
||||
try:
|
||||
res = requests.get("{path}/apis/rbac.authorization.k8s.io/v1/roles".format(
|
||||
path=self.path),
|
||||
headers=self.headers, verify=False)
|
||||
parsed_response_content = json.loads(res.content)
|
||||
for item in parsed_response_content["items"]:
|
||||
self.all_roles_names_evidence.append(item["metadata"]["name"].encode('ascii', 'ignore'))
|
||||
return res.content if res.status_code == 200 else False
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
return self.get_items("{path}/apis/rbac.authorization.k8s.io/v1/roles".format(path=self.path))
|
||||
|
||||
def execute(self):
|
||||
api = self.access_api_server()
|
||||
if api:
|
||||
self.publish_event(ServerApiAccess(api, self.category))
|
||||
|
||||
self.add_token_to_headers()
|
||||
namespaces = self.get_all_namespaces()
|
||||
if namespaces:
|
||||
self.publish_event(ListAllNamespaces(namespaces))
|
||||
|
||||
if self.access_api_server():
|
||||
self.publish_event(ServerApiAccess(self.api_server_evidence))
|
||||
pods = self.get_pods()
|
||||
if pods:
|
||||
# TODO!! Doesn't this give you all the pods you are entitles to read, irrespective of namespace?
|
||||
self.publish_event(ListPodsAndNamespaces(pods))
|
||||
# else:
|
||||
# if self.get_pods_list_under_requested_scope(scope='namespaces/default'):
|
||||
# self.publish_event(ListPodUnderDefaultNamespace(self.namespaces_and_their_pod_names))
|
||||
|
||||
if self.get_all_namespaces():
|
||||
self.publish_event(ListAllNamespaces(self.all_namespaces_names_evidence))
|
||||
roles = self.get_all_roles()
|
||||
if roles:
|
||||
# TODO!! Doesn't this give you access to all the roles you're entitled to?
|
||||
self.publish_event(ListAllRoles(roles))
|
||||
# else:
|
||||
# if self.get_roles_under_default_namespace():
|
||||
# self.publish_event(ListAllRolesUnderDefaultNamespace(
|
||||
# self.roles_names_under_default_namespace_evidence))
|
||||
|
||||
if self.get_pods_list_under_requested_scope():
|
||||
self.publish_event(ListPodUnderAllNamespaces(self.namespaces_and_their_pod_names))
|
||||
else:
|
||||
if self.get_pods_list_under_requested_scope(scope='namespaces/default'):
|
||||
self.publish_event(ListPodUnderDefaultNamespace(self.namespaces_and_their_pod_names))
|
||||
cluster_roles = self.get_all_cluster_roles()
|
||||
if cluster_roles:
|
||||
self.publish_event(ListAllClusterRoles(cluster_roles))
|
||||
|
||||
if self.get_all_roles():
|
||||
self.publish_event(ListAllRoles(self.all_roles_names_evidence))
|
||||
else:
|
||||
if self.get_roles_under_default_namespace():
|
||||
self.publish_event(ListAllRolesUnderDefaultNamespace(
|
||||
self.roles_names_under_default_namespace_evidence))
|
||||
if self.get_all_cluster_roles():
|
||||
self.publish_event(ListAllClusterRoles(self.all_cluster_roles_names_evidence))
|
||||
# If we have a service account token, this event should get triggered twice - once with and once without
|
||||
# the token
|
||||
self.publish_event(ApiServerPassiveHunterFinished(namespaces))
|
||||
|
||||
self.publish_event(ApiServerPassiveHunterFinished(self.all_namespaces_names_evidence))
|
||||
@handler.subscribe(ApiServerWithServiceAccountToken)
|
||||
class AccessApiServerWithToken(AccessApiServer):
|
||||
""" API Server Hunter
|
||||
Accessing the API server using the service account token obtained from a compromised pod
|
||||
"""
|
||||
|
||||
def __init__(self, event):
|
||||
super(AccessApiServerWithToken, self).__init__(event)
|
||||
assert self.event.auth_token != ''
|
||||
self.headers = {'Authorization': 'Bearer ' + self.event.auth_token}
|
||||
self.category = InformationDisclosure
|
||||
|
||||
|
||||
# Active Hunter
|
||||
@handler.subscribe(ApiServerPassiveHunterFinished)
|
||||
class AccessApiServerViaServiceAccountTokenActive(ActiveHunter):
|
||||
class AccessApiServerActive(ActiveHunter, AccessApiServer):
|
||||
"""API server hunter
|
||||
Accessing the api server might grant an attacker full control over the cluster
|
||||
"""
|
||||
@@ -343,44 +317,66 @@ class AccessApiServerViaServiceAccountTokenActive(ActiveHunter):
|
||||
self.event = event
|
||||
self.path = "https://{}:{}".format(self.event.host, self.event.port)
|
||||
|
||||
# Getting Passive hunter's data:
|
||||
self.namespaces_and_their_pod_names = dict()
|
||||
self.all_namespaces_names = set(event.all_namespaces_names)
|
||||
def create_item(self, path, name, data):
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
if self.event.auth_token:
|
||||
headers['Authorization'] = 'Bearer {token}'.format(token=self.event.auth_token)
|
||||
|
||||
# 12 Evidences:
|
||||
self.is_privileged_pod_created = False
|
||||
self.created_pod_name_evidence = ''
|
||||
self.patched_newly_created_pod_evidence = ''
|
||||
self.deleted_newly_created_pod_evidence = ''
|
||||
try:
|
||||
res = requests.post(path.format(name=name), verify=False, data=data, headers=headers)
|
||||
if res.status_code in [200, 201, 202]:
|
||||
parsed_content = json.loads(res.content)
|
||||
return parsed_content['metadata']['name']
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
pass
|
||||
return None
|
||||
|
||||
self.created_role_evidence = ''
|
||||
self.patched_newly_created_role_evidence = ''
|
||||
self.deleted_newly_created_role_evidence = ''
|
||||
def patch_item(self, path, data):
|
||||
headers = {
|
||||
'Content-Type': 'application/json-patch+json'
|
||||
}
|
||||
if self.event.auth_token:
|
||||
headers['Authorization'] = 'Bearer {token}'.format(token=self.event.auth_token)
|
||||
try:
|
||||
res = requests.patch(path, headers=headers, verify=False, data=data)
|
||||
if res.status_code not in [200, 201, 202]:
|
||||
return None
|
||||
parsed_content = json.loads(res.content)
|
||||
# TODO is there a patch timestamp we could use?
|
||||
return parsed_content['metadata']['namespace']
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
pass
|
||||
return None
|
||||
|
||||
self.created_cluster_role_evidence = ''
|
||||
self.patched_newly_created_cluster_role_evidence = ''
|
||||
self.deleted_newly_created_cluster_role_evidence = ''
|
||||
def delete_item(self, path):
|
||||
headers = {}
|
||||
if self.event.auth_token:
|
||||
headers['Authorization'] = 'Bearer {token}'.format(token=self.event.auth_token)
|
||||
try:
|
||||
res = requests.delete(path, headers=headers, verify=False)
|
||||
if res.status_code in [200, 201, 202]:
|
||||
parsed_content = json.loads(res.content)
|
||||
return parsed_content['metadata']['deletionTimestamp']
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
pass
|
||||
return None
|
||||
|
||||
self.created_new_namespace_name_evidence = ''
|
||||
self.deleted_new_namespace_name_evidence = ''
|
||||
|
||||
# 3 Pod methods:
|
||||
def create_a_pod(self, namespace, is_privileged):
|
||||
if is_privileged and self.is_privileged_pod_created: # We don't want to create more than 1 privileged pod.
|
||||
return False
|
||||
privileged_value = ',"securityContext":{"privileged":true}' if is_privileged else ''
|
||||
random_name = (str(uuid.uuid4()))[0:5]
|
||||
json_pod = \
|
||||
"""
|
||||
|
||||
{{"apiVersion": "v1",
|
||||
"kind": "Pod",
|
||||
"metadata": {{
|
||||
"name": "{random_str}"
|
||||
"name": "{random_name}"
|
||||
}},
|
||||
"spec": {{
|
||||
"containers": [
|
||||
{{
|
||||
"name": "{random_str}",
|
||||
"name": "{random_name}",
|
||||
"image": "nginx:1.7.9",
|
||||
"ports": [
|
||||
{{
|
||||
@@ -392,98 +388,37 @@ class AccessApiServerViaServiceAccountTokenActive(ActiveHunter):
|
||||
]
|
||||
}}
|
||||
}}
|
||||
|
||||
""".format(random_str=(str(uuid.uuid4()))[0:5], is_privileged_flag=privileged_value)
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': 'Bearer {token}'.format(token=self.event.auth_token)
|
||||
}
|
||||
try:
|
||||
res = requests.post("{path}/api/v1/namespaces/{namespace}/pods".format(
|
||||
path=self.path, namespace=namespace),
|
||||
verify=False, data=json_pod, headers=headers)
|
||||
if res.status_code not in [200, 201, 202]: return False
|
||||
|
||||
parsed_content = json.loads(res.content)
|
||||
self.created_pod_name_evidence = parsed_content['metadata']['name']
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
if is_privileged:
|
||||
self.is_privileged_pod_created = True
|
||||
return True
|
||||
""".format(random_name=random_name, is_privileged_flag=privileged_value)
|
||||
return self.create_item(path="{path}/api/v1/namespaces/{namespace}/pods".format(
|
||||
path=self.path, namespace=namespace), name=random_name, data=json_pod)
|
||||
|
||||
def delete_a_pod(self, namespace, pod_name):
|
||||
try:
|
||||
res = requests.delete("{path}/api/v1/namespaces/{namespace}/pods/{name}".format(
|
||||
path=self.path, name=pod_name, namespace=namespace),
|
||||
headers={'Authorization': 'Bearer ' + self.event.auth_token}, verify=False)
|
||||
if res.status_code not in [200, 201, 202]: return False
|
||||
parsed_content = json.loads(res.content)
|
||||
self.deleted_newly_created_pod_evidence = parsed_content['metadata']['deletionTimestamp']
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
return True
|
||||
delete_timestamp = self.delete_item("{path}/api/v1/namespaces/{namespace}/pods/{name}".format(
|
||||
path=self.path, name=pod_name, namespace=namespace))
|
||||
if delete_timestamp is None:
|
||||
logging.error("Created pod {name} in namespace {namespace} but unable to delete it".format(name=pod_name, namespace=namespace))
|
||||
return delete_timestamp
|
||||
|
||||
def patch_a_pod(self, namespace, pod_name):
|
||||
# Initialize request variables:
|
||||
patch_data = '[{ "op": "add", "path": "/hello", "value": ["world"] }]'
|
||||
headers = {
|
||||
'Content-Type': 'application/json-patch+json',
|
||||
'Authorization': 'Bearer {token}'.format(token=self.event.auth_token)
|
||||
}
|
||||
try:
|
||||
res = requests.patch("{path}/api/v1/namespaces/{namespace}/pods/{name}".format(
|
||||
data = '[{ "op": "add", "path": "/hello", "value": ["world"] }]'
|
||||
return self.patch_item(path="{path}/api/v1/namespaces/{namespace}/pods/{name}".format(
|
||||
path=self.path, namespace=namespace, name=pod_name),
|
||||
headers=headers, verify=False, data=patch_data)
|
||||
if res.status_code not in [200, 201, 202]: return False
|
||||
parsed_content = json.loads(res.content)
|
||||
self.patched_newly_created_pod_evidence = parsed_content['metadata']['namespace']
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
return True
|
||||
data=data)
|
||||
|
||||
# 2 Namespaces methods:
|
||||
def create_namespace(self):
|
||||
# Initialize request variables:
|
||||
json_namespace = '{{"kind":"Namespace","apiVersion":"v1","metadata":{{"name":"{random_str}","labels":{{"name":"{random_str}"}}}}}}'.format(random_str=(str(uuid.uuid4()))[0:5])
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': 'Bearer {token}'.format(token=self.event.auth_token)
|
||||
}
|
||||
try:
|
||||
res = requests.post("{path}/api/v1/namespaces".format(
|
||||
path=self.path),
|
||||
verify=False, data=json_namespace, headers=headers)
|
||||
if res.status_code not in [200, 201, 202]: return False
|
||||
parsed_content = json.loads(res.content)
|
||||
self.created_new_namespace_name_evidence = parsed_content['metadata']['name']
|
||||
self.all_namespaces_names.add(self.created_new_namespace_name_evidence)
|
||||
except (requests.exceptions.ConnectionError, KeyError): # e.g. DNS failure, refused connection, etc
|
||||
return False
|
||||
return True
|
||||
random_name = (str(uuid.uuid4()))[0:5]
|
||||
json = '{{"kind":"Namespace","apiVersion":"v1","metadata":{{"name":"{random_str}","labels":{{"name":"{random_str}"}}}}}}'.format(random_str=random_name)
|
||||
return self.create_item(path="{path}/api/v1/namespaces".format(path=self.path), name=random_name, data=json)
|
||||
|
||||
def delete_namespace(self, namespace):
|
||||
delete_timestamp = self.delete_item("{path}/api/v1/namespaces/{name}".format(path=self.path, name=namespace))
|
||||
if delete_timestamp is None:
|
||||
logging.error("Created namespace {namespace} but unable to delete it".format(namespace=namespace))
|
||||
return delete_timestamp
|
||||
|
||||
# 2 Namespaces methods:
|
||||
def delete_namespace(self):
|
||||
# Initialize request header:
|
||||
headers = {
|
||||
'Authorization': 'Bearer {token}'.format(token=self.event.auth_token)
|
||||
}
|
||||
try:
|
||||
res = requests.delete("{path}/api/v1/namespaces/{name}".format(
|
||||
path=self.path, name=self.created_new_namespace_name_evidence),
|
||||
verify=False, headers=headers)
|
||||
if res.status_code != 200: return False
|
||||
parsed_content = json.loads(res.content)
|
||||
self.deleted_new_namespace_name_evidence = parsed_content['metadata']['name']
|
||||
self.all_namespaces_names.remove(self.created_new_namespace_name_evidence)
|
||||
except (requests.exceptions.ConnectionError, KeyError): # e.g. DNS failure, refused connection, etc
|
||||
return False
|
||||
return True
|
||||
|
||||
# 6 Roles & Cluster roles Methods:
|
||||
def create_a_role(self, namespace):
|
||||
# Initialize request variables:
|
||||
role_json = """{{
|
||||
name = (str(uuid.uuid4()))[0:5]
|
||||
role = """{{
|
||||
"kind": "Role",
|
||||
"apiVersion": "rbac.authorization.k8s.io/v1",
|
||||
"metadata": {{
|
||||
@@ -505,25 +440,13 @@ class AccessApiServerViaServiceAccountTokenActive(ActiveHunter):
|
||||
]
|
||||
}}
|
||||
]
|
||||
}}""".format(random_str=(str(uuid.uuid4()))[0:5], namespace=namespace)
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': 'Bearer {token}'.format(token=self.event.auth_token)
|
||||
}
|
||||
try:
|
||||
res = requests.post("{path}/apis/rbac.authorization.k8s.io/v1/namespaces/{namespace}/roles".format(
|
||||
path=self.path, namespace=namespace),
|
||||
headers=headers, verify=False, data=role_json)
|
||||
if res.status_code not in [200, 201, 202]: return False
|
||||
parsed_content = json.loads(res.content)
|
||||
self.created_role_evidence = parsed_content['metadata']['name']
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
return True
|
||||
|
||||
}}""".format(random_str=name, namespace=namespace)
|
||||
return self.create_item(path="{path}/apis/rbac.authorization.k8s.io/v1/namespaces/{namespace}/roles".format(
|
||||
path=self.path, namespace=namespace), name=name, data=role)
|
||||
|
||||
def create_a_cluster_role(self):
|
||||
# Initialize request variables:
|
||||
cluster_role_json = """{{
|
||||
name = (str(uuid.uuid4()))[0:5]
|
||||
cluster_role = """{{
|
||||
"kind": "ClusterRole",
|
||||
"apiVersion": "rbac.authorization.k8s.io/v1",
|
||||
"metadata": {{
|
||||
@@ -544,140 +467,101 @@ class AccessApiServerViaServiceAccountTokenActive(ActiveHunter):
|
||||
]
|
||||
}}
|
||||
]
|
||||
}}""".format(random_str=(str(uuid.uuid4()))[0:5])
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': 'Bearer {token}'.format(token=self.event.auth_token)
|
||||
}
|
||||
try:
|
||||
res = requests.post("{path}/apis/rbac.authorization.k8s.io/v1/clusterroles".format(
|
||||
path=self.path),
|
||||
headers=headers, verify=False, data=cluster_role_json)
|
||||
if res.status_code not in [200, 201, 202]: return False
|
||||
parsed_content = json.loads(res.content)
|
||||
self.created_cluster_role_evidence = parsed_content['metadata']['name']
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
return True
|
||||
}}""".format(random_str=name)
|
||||
return self.create_item(path="{path}/apis/rbac.authorization.k8s.io/v1/clusterroles".format(
|
||||
path=self.path), name=name, data=cluster_role)
|
||||
|
||||
def delete_a_role(self, namespace, newly_created_role_name):
|
||||
try:
|
||||
res = requests.delete("{path}/apis/rbac.authorization.k8s.io/v1/namespaces/{namespace}/roles/{role}".format(
|
||||
path=self.path, namespace=namespace, role=newly_created_role_name),
|
||||
headers={'Authorization': 'Bearer ' + self.event.auth_token}, verify=False)
|
||||
if res.status_code not in [200, 201, 202]: return False
|
||||
parsed_content = json.loads(res.content)
|
||||
self.deleted_newly_created_role_evidence = parsed_content["status"]
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
return True
|
||||
def delete_a_role(self, namespace, name):
|
||||
delete_timestamp = self.delete_item("{path}/apis/rbac.authorization.k8s.io/v1/namespaces/{namespace}/roles/{role}".format(
|
||||
path=self.path, name=namespace, role=name))
|
||||
if delete_timestamp is None:
|
||||
logging.error("Created role {name} in namespace {namespace} but unable to delete it".format(name=name, namespace=namespace))
|
||||
return delete_timestamp
|
||||
|
||||
def delete_a_cluster_role(self, newly_created_cluster_role_name):
|
||||
try:
|
||||
res = requests.delete("{path}/apis/rbac.authorization.k8s.io/v1/clusterroles/{name}".format(
|
||||
path=self.path, name=newly_created_cluster_role_name),
|
||||
headers={'Authorization': 'Bearer ' + self.event.auth_token}, verify=False)
|
||||
if res.status_code not in [200, 201, 202]: return False
|
||||
parsed_content = json.loads(res.content)
|
||||
self.deleted_newly_created_cluster_role_evidence = parsed_content["status"]
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
return True
|
||||
def delete_a_cluster_role(self, name):
|
||||
delete_timestamp = self.delete_item("{path}/apis/rbac.authorization.k8s.io/v1/clusterroles/{role}".format(
|
||||
path=self.path, role=name))
|
||||
if delete_timestamp is None:
|
||||
logging.error("Created cluster role {name} but unable to delete it".format(name=name))
|
||||
return delete_timestamp
|
||||
|
||||
def patch_a_role(self, namespace, newly_created_role_name):
|
||||
# Initialize request variables:
|
||||
patch_data = '[{ "op": "add", "path": "/hello", "value": ["world"] }]'
|
||||
headers = {
|
||||
'Content-Type': 'application/json-patch+json',
|
||||
'Authorization': 'Bearer {token}'.format(token=self.event.auth_token)
|
||||
}
|
||||
try:
|
||||
res = requests.patch("{path}/apis/rbac.authorization.k8s.io/v1/namespaces/{namespace}/roles/{name}".format(
|
||||
path=self.path, name=newly_created_role_name,
|
||||
namespace=namespace),
|
||||
headers=headers,
|
||||
verify=False, data=patch_data)
|
||||
if res.status_code not in [200, 201, 202]: return False
|
||||
parsed_content = json.loads(res.content)
|
||||
self.patched_newly_created_role_evidence = parsed_content['metadata']['name']
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
return True
|
||||
def patch_a_role(self, namespace, role):
|
||||
data = '[{ "op": "add", "path": "/hello", "value": ["world"] }]'
|
||||
return self.patch_item(path="{path}/apis/rbac.authorization.k8s.io/v1/namespaces/{namespace}/roles/{name}".format(
|
||||
path=self.path, name=role, namespace=namespace),
|
||||
data=data)
|
||||
|
||||
def patch_a_cluster_role(self, newly_created_cluster_role_name):
|
||||
|
||||
patch_data = '[{ "op": "add", "path": "/hello", "value": ["world"] }]'
|
||||
headers = {
|
||||
'Content-Type': 'application/json-patch+json',
|
||||
'Authorization': 'Bearer {token}'.format(token=self.event.auth_token)
|
||||
}
|
||||
try:
|
||||
res = requests.patch("{path}/apis/rbac.authorization.k8s.io/v1/clusterroles/{name}".format(
|
||||
path=self.path, name=newly_created_cluster_role_name),
|
||||
headers=headers,
|
||||
verify=False, data=patch_data)
|
||||
if res.status_code not in [200, 201, 202]: return False
|
||||
parsed_content = json.loads(res.content)
|
||||
self.patched_newly_created_cluster_role_evidence = parsed_content['metadata']['name']
|
||||
except (requests.exceptions.ConnectionError, KeyError):
|
||||
return False
|
||||
return True
|
||||
def patch_a_cluster_role(self, cluster_role):
|
||||
data = '[{ "op": "add", "path": "/hello", "value": ["world"] }]'
|
||||
return self.patch_item(path="{path}/apis/rbac.authorization.k8s.io/v1/clusterroles/{name}".format(
|
||||
path=self.path, name=cluster_role),
|
||||
data=data)
|
||||
|
||||
def execute(self):
|
||||
if self.event.auth_token != '':
|
||||
# Namespaces Api Calls:
|
||||
if self.create_namespace():
|
||||
self.publish_event(CreateANamespace('new namespace name: {name}'.
|
||||
format(name=self.created_new_namespace_name_evidence)))
|
||||
if self.delete_namespace():
|
||||
self.publish_event(DeleteANamespace(self.deleted_new_namespace_name_evidence))
|
||||
namespace = self.create_namespace()
|
||||
if namespace:
|
||||
self.publish_event(CreateANamespace('new namespace name: {name}'.format(name=namespace)))
|
||||
delete_timestamp = self.delete_namespace(namespace)
|
||||
if delete_timestamp:
|
||||
self.publish_event(DeleteANamespace(delete_timestamp))
|
||||
|
||||
# Cluster Roles Api Calls:
|
||||
if self.create_a_cluster_role():
|
||||
self.publish_event(CreateAClusterRole('Cluster role name: {name}'.format(
|
||||
name=self.created_cluster_role_evidence)))
|
||||
if self.patch_a_cluster_role(self.created_cluster_role_evidence):
|
||||
cluster_role = self.create_a_cluster_role()
|
||||
if cluster_role:
|
||||
self.publish_event(CreateAClusterRole('Cluster role name: {name}'.format(name=cluster_role)))
|
||||
|
||||
self.publish_event(PatchAClusterRole('Patched Cluster Role Name: {name}'.format(
|
||||
name=self.patched_newly_created_cluster_role_evidence)))
|
||||
patch_evidence = self.patch_a_cluster_role(cluster_role)
|
||||
if patch_evidence:
|
||||
self.publish_event(PatchAClusterRole('Patched Cluster Role Name: {name} Patch evidence: {patch_evidence}'.format(
|
||||
name=cluster_role, patch_evidence=patch_evidence)))
|
||||
|
||||
if self.delete_a_cluster_role(self.created_cluster_role_evidence):
|
||||
self.publish_event(DeleteAClusterRole('Cluster role status: {status}'.format(
|
||||
status=self.deleted_newly_created_cluster_role_evidence)))
|
||||
delete_timestamp = self.delete_a_cluster_role(cluster_role)
|
||||
if delete_timestamp:
|
||||
self.publish_event(DeleteAClusterRole('Cluster role {name} deletion time {delete_timestamp}'.format(
|
||||
name=cluster_role, delete_timestamp=delete_timestamp)))
|
||||
|
||||
# Operating on pods over all namespaces:
|
||||
for namespace in self.all_namespaces_names:
|
||||
# Pods Api Calls:
|
||||
if self.create_a_pod(namespace, True) or self.create_a_pod(namespace, False):
|
||||
# Try attacking all the namespaces we know about
|
||||
for namespace in self.event.namespaces:
|
||||
# Try creating and deleting a privileged pod
|
||||
pod_name = self.create_a_pod(namespace, True)
|
||||
if pod_name:
|
||||
self.publish_event(CreateAPrivilegedPod('Pod Name: {pod_name} Namespace: {namespace}'.format(
|
||||
pod_name=pod_name, namespace=namespace)))
|
||||
delete_time = self.delete_a_pod(namespace, pod_name)
|
||||
if delete_time:
|
||||
self.publish_event(DeleteAPod('Pod Name: {pod_name} deletion time: {delete_time}'.format(
|
||||
pod_name=pod_name, delete_evidence=delete_time)))
|
||||
|
||||
# Try creating, patching and deleting an unprivileged pod
|
||||
pod_name = self.create_a_pod(namespace, False)
|
||||
if pod_name:
|
||||
self.publish_event(CreateAPod('Pod Name: {pod_name} Namespace: {namespace}'.format(
|
||||
pod_name=pod_name, namespace=namespace)))
|
||||
|
||||
if self.is_privileged_pod_created:
|
||||
self.publish_event(CreateAPrivilegedPod('Pod Name: {pod_name} Pod Namespace: {pod_namespace}'.format(
|
||||
pod_name=self.created_pod_name_evidence, pod_namespace=namespace)))
|
||||
else:
|
||||
self.publish_event(CreateAPod('Pod Name: {pod_name} Pod Namespace: {pod_namespace}'.format(
|
||||
pod_name=self.created_pod_name_evidence, pod_namespace=namespace)))
|
||||
patch_evidence = self.patch_a_pod(namespace, pod_name)
|
||||
if patch_evidence:
|
||||
self.publish_event(PatchAPod('Pod Name: {pod_name} Namespace: {namespace} Patch evidence: {patch_evidence}'.format(
|
||||
pod_name=pod_name, namespace=namespace,
|
||||
patch_evidence=patch_evidence)))
|
||||
|
||||
if self.patch_a_pod(namespace, self.created_pod_name_evidence):
|
||||
self.publish_event(PatchAPod('Pod Name: {pod_name} Pod namespace: {patch_evidence}'.format(
|
||||
pod_name=self.created_pod_name_evidence,
|
||||
patch_evidence=self.patched_newly_created_pod_evidence)))
|
||||
delete_time = self.delete_a_pod(namespace, pod_name)
|
||||
if delete_time:
|
||||
self.publish_event(DeleteAPod('Pod Name: {pod_name} Namespace: {namespace} Delete time: {delete_time}'.format(
|
||||
pod_name=pod_name, namespace=namespace, delete_time=delete_time)))
|
||||
|
||||
if self.delete_a_pod(namespace, self.created_pod_name_evidence):
|
||||
self.publish_event(DeleteAPod('Pod Name: {pod_name} deletion time: {delete_evidence}'.format(
|
||||
pod_name=self.created_pod_name_evidence,
|
||||
delete_evidence=self.deleted_newly_created_pod_evidence)))
|
||||
# Roles Api Calls:
|
||||
if self.create_a_role(namespace):
|
||||
self.publish_event(CreateARole('Role name: {name}'.format(
|
||||
name=self.created_role_evidence)))
|
||||
# Roles Api Calls:
|
||||
role = self.create_a_role(namespace)
|
||||
if role:
|
||||
self.publish_event(CreateARole('Role name: {name}'.format(name=role)))
|
||||
|
||||
if self.patch_a_role(namespace, self.created_role_evidence):
|
||||
self.publish_event(PatchARole('Patched Role Name: {name}'.format(
|
||||
name=self.patched_newly_created_role_evidence)))
|
||||
patch_evidence = self.patch_a_role(namespace, role)
|
||||
if patch_evidence:
|
||||
self.publish_event(PatchARole('Patched Role Name: {name} Namespace: {namespace} Patch evidence: {patch_evidence}'.format(
|
||||
name=role, namespace=namespace, patch_evidence=patch_evidence)))
|
||||
|
||||
if self.delete_a_role(namespace, self.created_role_evidence):
|
||||
self.publish_event(DeleteARole('Role Status response: {status}'.format(
|
||||
status=self.deleted_newly_created_role_evidence)))
|
||||
delete_time = self.delete_a_role(namespace, role)
|
||||
if delete_time:
|
||||
self.publish_event(DeleteARole('Deleted role: {name} Namespace: {namespace} Delete time: {delete_time}'.format(
|
||||
name=role, namespace=namespace, delete_time=delete_time)))
|
||||
|
||||
|
||||
# Note: we are not binding any role or cluster role because
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
import requests_mock
|
||||
|
||||
from apiserver import AccessApiServerViaServiceAccountToken
|
||||
from ..discovery.apiserver import ApiServer
|
||||
from ...core.events.types import Event
|
||||
from ...core.events import handler
|
||||
|
||||
def test_ApiServer():
|
||||
|
||||
e = ApiServer()
|
||||
e.host = "1.2.3.4"
|
||||
e.auth_token = "my-secret-token"
|
||||
|
||||
# Test that the pod's token is passed on through the event
|
||||
h = AccessApiServerViaServiceAccountToken(e)
|
||||
assert h.event.auth_token == "my-secret-token"
|
||||
@@ -1,6 +1,6 @@
|
||||
import requests_mock
|
||||
|
||||
from src.modules.discovery.apiserver import ApiServer, ApiServerDiscovery
|
||||
from src.modules.discovery.apiserver import ApiServer, ApiServerDiscovery, ApiServerWithServiceAccountToken
|
||||
from src.core.events.types import Event
|
||||
from src.core.events import handler
|
||||
|
||||
@@ -20,8 +20,26 @@ def test_ApiServer():
|
||||
e.host = 'mockKubernetes'
|
||||
a.execute()
|
||||
|
||||
def test_ApiServerWithServiceAccountToken():
|
||||
with requests_mock.Mocker() as m:
|
||||
m.get('https://mockKubernetes:443', text='{"code":403}')
|
||||
|
||||
e = Event()
|
||||
e.port = 443
|
||||
e.host = 'mockKubernetes'
|
||||
e.auth_token = "very_secret"
|
||||
|
||||
a = ApiServerDiscovery(e)
|
||||
a.execute()
|
||||
|
||||
# We should only generate an ApiServer event for a response that looks like it came from a Kubernetes node
|
||||
@handler.subscribe(ApiServer)
|
||||
class testApiServer(object):
|
||||
def __init__(self, event):
|
||||
assert event.host == 'mockKubernetes'
|
||||
|
||||
@handler.subscribe(ApiServerWithServiceAccountToken)
|
||||
class testApiServerWithServiceAccountToken(object):
|
||||
def __init__(self, event):
|
||||
assert event.host == 'mockKubernetes'
|
||||
assert event.auth_token == "very_secret"
|
||||
|
||||
176
tests/hunting/test_apiserver_hunter.py
Normal file
176
tests/hunting/test_apiserver_hunter.py
Normal file
@@ -0,0 +1,176 @@
|
||||
import requests_mock
|
||||
|
||||
from src.modules.hunting.apiserver import AccessApiServer, AccessApiServerWithToken, ServerApiAccess, AccessApiServerActive
|
||||
from src.modules.hunting.apiserver import ListAllNamespaces, ListPodsAndNamespaces, ListAllRoles, ListAllClusterRoles
|
||||
from src.modules.hunting.apiserver import ApiServerPassiveHunterFinished
|
||||
from src.modules.hunting.apiserver import CreateANamespace, DeleteANamespace
|
||||
from src.modules.discovery.apiserver import ApiServer
|
||||
from src.core.events.types import Event
|
||||
from src.core.types import UnauthenticatedAccess, InformationDisclosure
|
||||
from src.core.events import handler
|
||||
|
||||
def test_ApiServerToken():
|
||||
|
||||
e = ApiServer()
|
||||
e.host = "1.2.3.4"
|
||||
e.auth_token = "my-secret-token"
|
||||
|
||||
# Test that the pod's token is passed on through the event
|
||||
h = AccessApiServerWithToken(e)
|
||||
assert h.event.auth_token == "my-secret-token"
|
||||
|
||||
def test_AccessApiServer():
|
||||
e = ApiServer()
|
||||
e.host = "mockKubernetes"
|
||||
e.port = 443
|
||||
|
||||
with requests_mock.Mocker() as m:
|
||||
# TODO check that these responses reflect what Kubernetes does
|
||||
m.get('https://mockKubernetes:443/api', text='{}')
|
||||
m.get('https://mockKubernetes:443/api/v1/namespaces', text='{"items":[{"metadata":{"name":"hello"}}]}')
|
||||
m.get('https://mockKubernetes:443/api/v1/pods',
|
||||
text='{"items":[{"metadata":{"name":"podA", "namespace":"namespaceA"}}, \
|
||||
{"metadata":{"name":"podB", "namespace":"namespaceB"}}]}')
|
||||
m.get('https://mockkubernetes:443/apis/rbac.authorization.k8s.io/v1/roles', status_code=403)
|
||||
m.get('https://mockkubernetes:443/apis/rbac.authorization.k8s.io/v1/clusterroles', text='{"items":[]}')
|
||||
|
||||
h = AccessApiServer(e)
|
||||
h.execute()
|
||||
|
||||
with requests_mock.Mocker() as m:
|
||||
# TODO check that these responses reflect what Kubernetes does
|
||||
m.get('https://mockKubernetesToken:443/api', text='{}')
|
||||
m.get('https://mockKubernetesToken:443/api/v1/namespaces', text='{"items":[{"metadata":{"name":"hello"}}]}')
|
||||
m.get('https://mockKubernetesToken:443/api/v1/pods',
|
||||
text='{"items":[{"metadata":{"name":"podA", "namespace":"namespaceA"}}, \
|
||||
{"metadata":{"name":"podB", "namespace":"namespaceB"}}]}')
|
||||
m.get('https://mockkubernetesToken:443/apis/rbac.authorization.k8s.io/v1/roles', status_code=403)
|
||||
m.get('https://mockkubernetesToken:443/apis/rbac.authorization.k8s.io/v1/clusterroles',
|
||||
text='{"items":[{"metadata":{"name":"my-role"}}]}')
|
||||
|
||||
e.auth_token = "so-secret"
|
||||
e.host = "mockKubernetesToken"
|
||||
h = AccessApiServerWithToken(e)
|
||||
h.execute()
|
||||
|
||||
|
||||
@handler.subscribe(ListAllNamespaces)
|
||||
class test_ListAllNamespaces(object):
|
||||
def __init__(self, event):
|
||||
assert event.evidence == ['hello']
|
||||
if event.host == "mockKubernetesToken":
|
||||
assert event.auth_token == "so-secret"
|
||||
else:
|
||||
assert event.auth_token is None
|
||||
|
||||
|
||||
@handler.subscribe(ListPodsAndNamespaces)
|
||||
class test_ListPodsAndNamespaces(object):
|
||||
def __init__(self, event):
|
||||
assert len(event.evidence) == 2
|
||||
for pod in event.evidence:
|
||||
if pod["name"] == "podA":
|
||||
assert pod["namespace"] == "namespaceA"
|
||||
if pod["name"] == "podB":
|
||||
assert pod["namespace"] == "namespaceB"
|
||||
if event.host == "mockKubernetesToken":
|
||||
assert event.auth_token == "so-secret"
|
||||
else:
|
||||
assert event.auth_token is None
|
||||
|
||||
# Should never see this because the API call in the test returns 403 status code
|
||||
@handler.subscribe(ListAllRoles)
|
||||
class test_ListAllRoles(object):
|
||||
def __init__(self, event):
|
||||
assert 0
|
||||
|
||||
# Should only see this when we have a token because the API call returns an empty list of items
|
||||
# in the test where we have no token
|
||||
@handler.subscribe(ListAllClusterRoles)
|
||||
class test_ListAllClusterRoles(object):
|
||||
def __init__(self, event):
|
||||
assert event.auth_token == "so-secret"
|
||||
|
||||
@handler.subscribe(ServerApiAccess)
|
||||
class test_ServerApiAccess(object):
|
||||
def __init__(self, event):
|
||||
if event.category == UnauthenticatedAccess:
|
||||
assert event.auth_token is None
|
||||
else:
|
||||
assert event.category == InformationDisclosure
|
||||
assert event.auth_token is not None
|
||||
|
||||
@handler.subscribe(ApiServerPassiveHunterFinished)
|
||||
class test_PassiveHunterFinished(object):
|
||||
def __init__(self, event):
|
||||
assert event.namespaces == ["hello"]
|
||||
|
||||
def test_AccessApiServerActive():
|
||||
e = ApiServerPassiveHunterFinished(namespaces=["hello-namespace"])
|
||||
e.host = "mockKubernetes"
|
||||
e.port = 443
|
||||
|
||||
with requests_mock.Mocker() as m:
|
||||
# TODO check that these responses reflect what Kubernetes does
|
||||
m.post('https://mockKubernetes:443/api/v1/namespaces', text="""
|
||||
{
|
||||
"kind": "Namespace",
|
||||
"apiVersion": "v1",
|
||||
"metadata": {
|
||||
"name": "abcde",
|
||||
"selfLink": "/api/v1/namespaces/abcde",
|
||||
"uid": "4a7aa47c-39ba-11e9-ab46-08002781145e",
|
||||
"resourceVersion": "694180",
|
||||
"creationTimestamp": "2019-02-26T11:33:08Z"
|
||||
},
|
||||
"spec": {
|
||||
"finalizers": [
|
||||
"kubernetes"
|
||||
]
|
||||
},
|
||||
"status": {
|
||||
"phase": "Active"
|
||||
}
|
||||
}
|
||||
"""
|
||||
)
|
||||
m.post('https://mockKubernetes:443/api/v1/clusterroles', text='{}')
|
||||
m.post('https://mockkubernetes:443/apis/rbac.authorization.k8s.io/v1/clusterroles', text='{}')
|
||||
m.post('https://mockkubernetes:443/api/v1/namespaces/hello-namespace/pods', text='{}')
|
||||
m.post('https://mockkubernetes:443/apis/rbac.authorization.k8s.io/v1/namespaces/hello-namespace/roles', text='{}')
|
||||
|
||||
m.delete('https://mockKubernetes:443/api/v1/namespaces/abcde', text="""
|
||||
{
|
||||
"kind": "Namespace",
|
||||
"apiVersion": "v1",
|
||||
"metadata": {
|
||||
"name": "abcde",
|
||||
"selfLink": "/api/v1/namespaces/abcde",
|
||||
"uid": "4a7aa47c-39ba-11e9-ab46-08002781145e",
|
||||
"resourceVersion": "694780",
|
||||
"creationTimestamp": "2019-02-26T11:33:08Z",
|
||||
"deletionTimestamp": "2019-02-26T11:40:58Z"
|
||||
},
|
||||
"spec": {
|
||||
"finalizers": [
|
||||
"kubernetes"
|
||||
]
|
||||
},
|
||||
"status": {
|
||||
"phase": "Terminating"
|
||||
}
|
||||
}
|
||||
""")
|
||||
|
||||
h = AccessApiServerActive(e)
|
||||
h.execute()
|
||||
|
||||
@handler.subscribe(CreateANamespace)
|
||||
class test_CreateANamespace(object):
|
||||
def __init__(self, event):
|
||||
assert "abcde" in event.evidence
|
||||
|
||||
@handler.subscribe(DeleteANamespace)
|
||||
class test_DeleteANamespace(object):
|
||||
def __init__(self, event):
|
||||
assert "2019-02-26" in event.evidence
|
||||
Reference in New Issue
Block a user