diff --git a/docs/_kb/KHV052.md b/docs/_kb/KHV052.md new file mode 100644 index 0000000..711f13d --- /dev/null +++ b/docs/_kb/KHV052.md @@ -0,0 +1,23 @@ +--- +vid: KHV052 +title: Exposed Pods +categories: [Information Disclosure] +--- + +# {{ page.vid }} - {{ page.title }} + +## Issue description + +An attacker could view sensitive information about pods that are bound to a Node using the exposed /pods endpoint +This can be done either by accessing the readonly port (default 10255), or from the secure kubelet port (10250) + +## Remediation + +Ensure kubelet is protected using `--anonymous-auth=false` kubelet flag. Allow only legitimate users using `--client-ca-file` or `--authentication-token-webhook` kubelet flags. This is usually done by the installer or cloud provider. + +Disable the readonly port by using `--read-only-port=0` kubelet flag. + +## References + +- [Kubelet configuration](https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet/) +- [Kubelet authentication/authorization](https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-authentication-authorization/) \ No newline at end of file diff --git a/kube_hunter/__main__.py b/kube_hunter/__main__.py index ba45e80..9002bee 100755 --- a/kube_hunter/__main__.py +++ b/kube_hunter/__main__.py @@ -73,13 +73,13 @@ def list_hunters(): print("\nPassive Hunters:\n----------------") for hunter, docs in handler.passive_hunters.items(): name, doc = hunter.parse_docs(docs) - print("* {}\n {}\n".format(name, doc)) + print(f"* {name}\n {doc}\n") if config.active: print("\n\nActive Hunters:\n---------------") for hunter, docs in handler.active_hunters.items(): name, doc = hunter.parse_docs(docs) - print("* {}\n {}\n".format(name, doc)) + print(f"* {name}\n {doc}\n") hunt_started_lock = threading.Lock() diff --git a/kube_hunter/core/events/handler.py b/kube_hunter/core/events/handler.py index b705826..24eb1dc 100644 --- a/kube_hunter/core/events/handler.py +++ b/kube_hunter/core/events/handler.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) # Inherits Queue object, handles events asynchronously class EventQueue(Queue): def __init__(self, num_worker=10): - super(EventQueue, self).__init__() + super().__init__() self.passive_hunters = dict() self.active_hunters = dict() self.all_hunters = dict() diff --git a/kube_hunter/modules/discovery/hosts.py b/kube_hunter/modules/discovery/hosts.py index ddf86bb..afc1594 100644 --- a/kube_hunter/modules/discovery/hosts.py +++ b/kube_hunter/modules/discovery/hosts.py @@ -37,7 +37,7 @@ class RunningAsPodEvent(Event): try: with open(f"/var/run/secrets/kubernetes.io/serviceaccount/{file}") as f: return f.read() - except IOError: + except OSError: pass @@ -53,7 +53,7 @@ class AzureMetadataApi(Vulnerability, Event): vid="KHV003", ) self.cidr = cidr - self.evidence = "cidr: {}".format(cidr) + self.evidence = f"cidr: {cidr}" class HostScanEvent(Event): diff --git a/kube_hunter/modules/hunting/apiserver.py b/kube_hunter/modules/hunting/apiserver.py index f8f78ba..4dfcc19 100644 --- a/kube_hunter/modules/hunting/apiserver.py +++ b/kube_hunter/modules/hunting/apiserver.py @@ -343,7 +343,7 @@ class AccessApiServer(Hunter): else: self.publish_event(ServerApiAccess(api, self.with_token)) - namespaces = self.get_items("{path}/api/v1/namespaces".format(path=self.path)) + namespaces = self.get_items(f"{self.path}/api/v1/namespaces") if namespaces: self.publish_event(ListNamespaces(namespaces, self.with_token)) @@ -371,7 +371,7 @@ class AccessApiServerWithToken(AccessApiServer): """ def __init__(self, event): - super(AccessApiServerWithToken, self).__init__(event) + super().__init__(event) assert self.event.auth_token self.headers = {"Authorization": f"Bearer {self.event.auth_token}"} self.category = InformationDisclosure diff --git a/kube_hunter/modules/hunting/arp.py b/kube_hunter/modules/hunting/arp.py index b8c1ea3..a1a0bbd 100644 --- a/kube_hunter/modules/hunting/arp.py +++ b/kube_hunter/modules/hunting/arp.py @@ -43,7 +43,7 @@ class ArpSpoofHunter(ActiveHunter): def detect_l3_on_host(self, arp_responses): """ returns True for an existence of an L3 network plugin """ logger.debug("Attempting to detect L3 network plugin using ARP") - unique_macs = list(set(response[ARP].hwsrc for _, response in arp_responses)) + unique_macs = list({response[ARP].hwsrc for _, response in arp_responses}) # if LAN addresses not unique if len(unique_macs) == 1: diff --git a/kube_hunter/modules/hunting/certificates.py b/kube_hunter/modules/hunting/certificates.py index bfbffe4..61c8f15 100644 --- a/kube_hunter/modules/hunting/certificates.py +++ b/kube_hunter/modules/hunting/certificates.py @@ -23,7 +23,7 @@ class CertificateEmail(Vulnerability, Event): vid="KHV021", ) self.email = email - self.evidence = "email: {}".format(self.email) + self.evidence = f"email: {self.email}" @handler.subscribe(Service) diff --git a/kube_hunter/modules/hunting/cves.py b/kube_hunter/modules/hunting/cves.py index 697bab1..2e8ffc8 100644 --- a/kube_hunter/modules/hunting/cves.py +++ b/kube_hunter/modules/hunting/cves.py @@ -104,7 +104,7 @@ class IncompleteFixToKubectlCpVulnerability(Vulnerability, Event): vid="KHV027", ) self.binary_version = binary_version - self.evidence = "kubectl version: {}".format(self.binary_version) + self.evidence = f"kubectl version: {self.binary_version}" class KubectlCpVulnerability(Vulnerability, Event): @@ -120,7 +120,7 @@ class KubectlCpVulnerability(Vulnerability, Event): vid="KHV028", ) self.binary_version = binary_version - self.evidence = "kubectl version: {}".format(self.binary_version) + self.evidence = f"kubectl version: {self.binary_version}" class CveUtils: diff --git a/kube_hunter/modules/hunting/dns.py b/kube_hunter/modules/hunting/dns.py index 3705bd9..6635b37 100644 --- a/kube_hunter/modules/hunting/dns.py +++ b/kube_hunter/modules/hunting/dns.py @@ -25,7 +25,7 @@ class PossibleDnsSpoofing(Vulnerability, Event): vid="KHV030", ) self.kubedns_pod_ip = kubedns_pod_ip - self.evidence = "kube-dns at: {}".format(self.kubedns_pod_ip) + self.evidence = f"kube-dns at: {self.kubedns_pod_ip}" # Only triggered with RunningAsPod base event diff --git a/kube_hunter/modules/hunting/kubelet.py b/kube_hunter/modules/hunting/kubelet.py index b6808f4..a2c0c82 100644 --- a/kube_hunter/modules/hunting/kubelet.py +++ b/kube_hunter/modules/hunting/kubelet.py @@ -35,10 +35,7 @@ class ExposedPodsHandler(Vulnerability, Event): def __init__(self, pods): Vulnerability.__init__( - self, - component=Kubelet, - name="Exposed Pods", - category=InformationDisclosure, + self, component=Kubelet, name="Exposed Pods", category=InformationDisclosure, vid="KHV052" ) self.pods = pods self.evidence = f"count: {len(self.pods)}" @@ -84,7 +81,7 @@ class ExposedRunningPodsHandler(Vulnerability, Event): vid="KHV038", ) self.count = count - self.evidence = "{} running pods".format(self.count) + self.evidence = f"{self.count} running pods" class ExposedExecHandler(Vulnerability, Event): @@ -533,7 +530,7 @@ class ProveAnonymousAuth(ActiveHunter): def __init__(self, event): self.event = event - self.base_url = "https://{host}:10250/".format(host=self.event.host) + self.base_url = f"https://{self.event.host}:10250/" def get_request(self, url, verify=False): config = get_config() @@ -572,7 +569,7 @@ class ProveAnonymousAuth(ActiveHunter): return ProveAnonymousAuth.has_no_error(result) and ProveAnonymousAuth.has_no_exception(result) def cat_command(self, run_request_url, full_file_path): - return self.post_request(run_request_url, {"cmd": "cat {}".format(full_file_path)}) + return self.post_request(run_request_url, {"cmd": f"cat {full_file_path}"}) def process_container(self, run_request_url): service_account_token = self.cat_command(run_request_url, "/var/run/secrets/kubernetes.io/serviceaccount/token") @@ -609,7 +606,7 @@ class ProveAnonymousAuth(ActiveHunter): for container_data in pod_data["spec"]["containers"]: container_name = container_data["name"] - run_request_url = self.base_url + "run/{}/{}/{}".format(pod_namespace, pod_id, container_name) + run_request_url = self.base_url + f"run/{pod_namespace}/{pod_id}/{container_name}" extracted_data = self.process_container(run_request_url) @@ -618,11 +615,11 @@ class ProveAnonymousAuth(ActiveHunter): environment_variables = extracted_data["environment_variables"] temp_message += ( - "\n\nPod namespace: {}".format(pod_namespace) - + "\n\nPod ID: {}".format(pod_id) - + "\n\nContainer name: {}".format(container_name) - + "\n\nService account token: {}".format(service_account_token) - + "\nEnvironment variables: {}".format(environment_variables) + f"\n\nPod namespace: {pod_namespace}" + + f"\n\nPod ID: {pod_id}" + + f"\n\nContainer name: {container_name}" + + f"\n\nService account token: {service_account_token}" + + f"\nEnvironment variables: {environment_variables}" ) first_check = container_data.get("securityContext", {}).get("privileged") @@ -647,7 +644,7 @@ class ProveAnonymousAuth(ActiveHunter): if temp_message: message = "The following containers have been successfully breached." + temp_message - self.event.evidence = "{}".format(message) + self.event.evidence = f"{message}" if exposed_existing_privileged_containers: self.publish_event( @@ -667,7 +664,7 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): def __init__(self, event, seconds_to_wait_for_os_command=1): self.event = event - self.base_url = "https://{host}:10250/".format(host=self.event.host) + self.base_url = f"https://{self.event.host}:10250/" self.seconds_to_wait_for_os_command = seconds_to_wait_for_os_command self.number_of_rm_attempts = 5 self.number_of_rmdir_attempts = 5 @@ -686,7 +683,7 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): return "Exception: " + str(ex) def cat_command(self, run_request_url, full_file_path): - return self.post_request(run_request_url, {"cmd": "cat {}".format(full_file_path)}) + return self.post_request(run_request_url, {"cmd": f"cat {full_file_path}"}) def clean_attacked_exposed_existing_privileged_container( self, @@ -702,7 +699,7 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): self.rm_command( run_request_url, - "{}/etc/cron.daily/{}".format(directory_created, file_created), + f"{directory_created}/etc/cron.daily/{file_created}", number_of_rm_attempts, seconds_to_wait_for_os_command, ) @@ -730,9 +727,7 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): def rm_command(self, run_request_url, file_to_remove, number_of_rm_attempts, seconds_to_wait_for_os_command): if self.check_file_exists(run_request_url, file_to_remove): for _ in range(number_of_rm_attempts): - command_execution_outcome = self.post_request( - run_request_url, {"cmd": "rm -f {}".format(file_to_remove)} - ) + command_execution_outcome = self.post_request(run_request_url, {"cmd": f"rm -f {file_to_remove}"}) if seconds_to_wait_for_os_command: time.sleep(seconds_to_wait_for_os_command) @@ -759,10 +754,10 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): return False def chmod_command(self, run_request_url, permissions, file): - return self.post_request(run_request_url, {"cmd": "chmod {} {}".format(permissions, file)}) + return self.post_request(run_request_url, {"cmd": f"chmod {permissions} {file}"}) def touch_command(self, run_request_url, file_to_create): - return self.post_request(run_request_url, {"cmd": "touch {}".format(file_to_create)}) + return self.post_request(run_request_url, {"cmd": f"touch {file_to_create}"}) def attack_exposed_existing_privileged_container( self, run_request_url, directory_created, number_of_rm_attempts, seconds_to_wait_for_os_command, file_name=None @@ -770,7 +765,7 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): if file_name is None: file_name = "kube-hunter" + str(uuid.uuid1()) - file_name_with_path = "{}/etc/cron.daily/{}".format(directory_created, file_name) + file_name_with_path = f"{directory_created}/etc/cron.daily/{file_name}" file_created = self.touch_command(run_request_url, file_name_with_path) @@ -798,9 +793,7 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): ): if self.check_directory_exists(run_request_url, directory_to_remove): for _ in range(number_of_rmdir_attempts): - command_execution_outcome = self.post_request( - run_request_url, {"cmd": "rmdir {}".format(directory_to_remove)} - ) + command_execution_outcome = self.post_request(run_request_url, {"cmd": f"rmdir {directory_to_remove}"}) if seconds_to_wait_for_os_command: time.sleep(seconds_to_wait_for_os_command) @@ -827,7 +820,7 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): return False def ls_command(self, run_request_url, file_or_directory): - return self.post_request(run_request_url, {"cmd": "ls {}".format(file_or_directory)}) + return self.post_request(run_request_url, {"cmd": f"ls {file_or_directory}"}) def umount_command( self, @@ -845,7 +838,7 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): for _ in range(number_of_umount_attempts): # Ref: http://man7.org/linux/man-pages/man2/umount.2.html command_execution_outcome = self.post_request( - run_request_url, {"cmd": "umount {} {}".format(file_system_or_partition, directory)} + run_request_url, {"cmd": f"umount {file_system_or_partition} {directory}"} ) if seconds_to_wait_for_os_command: @@ -876,16 +869,16 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): def mount_command(self, run_request_url, file_system_or_partition, directory): # Ref: http://man7.org/linux/man-pages/man1/mkdir.1.html - return self.post_request(run_request_url, {"cmd": "mount {} {}".format(file_system_or_partition, directory)}) + return self.post_request(run_request_url, {"cmd": f"mount {file_system_or_partition} {directory}"}) def mkdir_command(self, run_request_url, directory_to_create): # Ref: http://man7.org/linux/man-pages/man1/mkdir.1.html - return self.post_request(run_request_url, {"cmd": "mkdir {}".format(directory_to_create)}) + return self.post_request(run_request_url, {"cmd": f"mkdir {directory_to_create}"}) def findfs_command(self, run_request_url, file_system_or_partition_type, file_system_or_partition): # Ref: http://man7.org/linux/man-pages/man8/findfs.8.html return self.post_request( - run_request_url, {"cmd": "findfs {}{}".format(file_system_or_partition_type, file_system_or_partition)} + run_request_url, {"cmd": f"findfs {file_system_or_partition_type}{file_system_or_partition}"} ) def get_root_values(self, command_line): @@ -944,9 +937,7 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): ) if ProveAnonymousAuth.has_no_error_nor_exception(mounted_file_system_or_partition): - host_name = self.cat_command( - run_request_url, "{}/etc/hostname".format(directory_created) - ) + host_name = self.cat_command(run_request_url, f"{directory_created}/etc/hostname") if ProveAnonymousAuth.has_no_error_nor_exception(host_name): return { @@ -980,7 +971,7 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): pod_id = exposed_existing_privileged_containers["pod_id"] container_name = exposed_existing_privileged_containers["container_name"] - run_request_url = self.base_url + "run/{}/{}/{}".format(pod_namespace, pod_id, container_name) + run_request_url = self.base_url + f"run/{pod_namespace}/{pod_id}/{container_name}" is_exposed_existing_privileged_container_privileged = self.process_exposed_existing_privileged_container( run_request_url, @@ -1030,7 +1021,7 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): + temp_message ) - self.event.evidence = "{}".format(message) + self.event.evidence = f"{message}" else: message = ( "The following exposed existing privileged containers" @@ -1039,7 +1030,7 @@ class MaliciousIntentViaSecureKubeletPort(ActiveHunter): + temp_message ) - self.event.evidence = "{}".format(message) + self.event.evidence = f"{message}" @handler.subscribe(ExposedRunHandler) diff --git a/kube_hunter/modules/hunting/mounts.py b/kube_hunter/modules/hunting/mounts.py index ebd730f..ccbb0d2 100644 --- a/kube_hunter/modules/hunting/mounts.py +++ b/kube_hunter/modules/hunting/mounts.py @@ -32,7 +32,7 @@ class WriteMountToVarLog(Vulnerability, Event): vid="KHV047", ) self.pods = pods - self.evidence = "pods: {}".format(", ".join((pod["metadata"]["name"] for pod in self.pods))) + self.evidence = "pods: {}".format(", ".join(pod["metadata"]["name"] for pod in self.pods)) class DirectoryTraversalWithKubelet(Vulnerability, Event): @@ -47,7 +47,7 @@ class DirectoryTraversalWithKubelet(Vulnerability, Event): category=PrivilegeEscalation, ) self.output = output - self.evidence = "output: {}".format(self.output) + self.evidence = f"output: {self.output}" @handler.subscribe(ExposedPodsHandler) diff --git a/kube_hunter/modules/report/base.py b/kube_hunter/modules/report/base.py index 1652e64..d8fed44 100644 --- a/kube_hunter/modules/report/base.py +++ b/kube_hunter/modules/report/base.py @@ -7,6 +7,9 @@ from kube_hunter.modules.report.collector import ( vulnerabilities_lock, ) +BASE_KB_LINK = "https://avd.aquasec.com/" +FULL_KB_LINK = "https://avd.aquasec.com/kube-hunter/{vid}/" + class BaseReporter: def get_nodes(self): @@ -38,6 +41,7 @@ class BaseReporter: "vulnerability": vuln.get_name(), "description": vuln.explain(), "evidence": str(vuln.evidence), + "avd_reference": FULL_KB_LINK.format(vid=vuln.get_vid().lower()), "hunter": vuln.hunter.get_name(), } for vuln in vulnerabilities @@ -63,6 +67,4 @@ class BaseReporter: if statistics: report["hunter_statistics"] = self.get_hunter_statistics() - report["kburl"] = "https://aquasecurity.github.io/kube-hunter/kb/{vid}" - return report diff --git a/kube_hunter/modules/report/plain.py b/kube_hunter/modules/report/plain.py index 5e54cb5..b2eb573 100644 --- a/kube_hunter/modules/report/plain.py +++ b/kube_hunter/modules/report/plain.py @@ -1,6 +1,6 @@ from prettytable import ALL, PrettyTable -from kube_hunter.modules.report.base import BaseReporter +from kube_hunter.modules.report.base import BaseReporter, BASE_KB_LINK from kube_hunter.modules.report.collector import ( services, vulnerabilities, @@ -11,7 +11,6 @@ from kube_hunter.modules.report.collector import ( EVIDENCE_PREVIEW = 100 MAX_TABLE_WIDTH = 20 -KB_LINK = "https://github.com/aquasecurity/kube-hunter/tree/master/docs/_kb" class PlainReporter(BaseReporter): @@ -60,7 +59,7 @@ class PlainReporter(BaseReporter): if service.event_id not in id_memory: nodes_table.add_row(["Node/Master", service.host]) id_memory.add(service.event_id) - nodes_ret = "\nNodes\n{}\n".format(nodes_table) + nodes_ret = f"\nNodes\n{nodes_table}\n" services_lock.release() return nodes_ret @@ -114,7 +113,7 @@ class PlainReporter(BaseReporter): return ( "\nVulnerabilities\n" "For further information about a vulnerability, search its ID in: \n" - f"{KB_LINK}\n{vuln_table}\n" + f"{BASE_KB_LINK}\n{vuln_table}\n" ) def hunters_table(self): diff --git a/tests/discovery/test_apiserver.py b/tests/discovery/test_apiserver.py index 5a9a16e..dba49ea 100644 --- a/tests/discovery/test_apiserver.py +++ b/tests/discovery/test_apiserver.py @@ -123,7 +123,7 @@ def test_InsecureApiServer(): # We should only generate an ApiServer event for a response that looks like it came from a Kubernetes node @handler.subscribe(ApiServer) -class testApiServer(object): +class testApiServer: def __init__(self, event): print("Event") assert event.host == "mockKubernetes" diff --git a/tests/discovery/test_hosts.py b/tests/discovery/test_hosts.py index 85b4b10..2c49dec 100644 --- a/tests/discovery/test_hosts.py +++ b/tests/discovery/test_hosts.py @@ -90,7 +90,7 @@ class TestDiscoveryUtils: def test_generate_hosts_valid_ignore(): remove = IPAddress("192.168.1.8") scan = "192.168.1.0/24" - expected = set(ip for ip in IPNetwork(scan) if ip != remove) + expected = {ip for ip in IPNetwork(scan) if ip != remove} actual = set(HostDiscoveryHelpers.generate_hosts([scan, f"!{str(remove)}"])) diff --git a/tests/hunting/test_apiserver_hunter.py b/tests/hunting/test_apiserver_hunter.py index 3318251..7cd6b20 100644 --- a/tests/hunting/test_apiserver_hunter.py +++ b/tests/hunting/test_apiserver_hunter.py @@ -122,7 +122,7 @@ def test_AccessApiServer(): @handler.subscribe(ListNamespaces) -class test_ListNamespaces(object): +class test_ListNamespaces: def __init__(self, event): print("ListNamespaces") assert event.evidence == ["hello"] @@ -135,7 +135,7 @@ class test_ListNamespaces(object): @handler.subscribe(ListPodsAndNamespaces) -class test_ListPodsAndNamespaces(object): +class test_ListPodsAndNamespaces: def __init__(self, event): print("ListPodsAndNamespaces") assert len(event.evidence) == 2 @@ -158,7 +158,7 @@ class test_ListPodsAndNamespaces(object): # Should never see this because the API call in the test returns 403 status code @handler.subscribe(ListRoles) -class test_ListRoles(object): +class test_ListRoles: def __init__(self, event): print("ListRoles") assert 0 @@ -169,7 +169,7 @@ class test_ListRoles(object): # Should only see this when we have a token because the API call returns an empty list of items # in the test where we have no token @handler.subscribe(ListClusterRoles) -class test_ListClusterRoles(object): +class test_ListClusterRoles: def __init__(self, event): print("ListClusterRoles") assert event.auth_token == "so-secret" @@ -178,7 +178,7 @@ class test_ListClusterRoles(object): @handler.subscribe(ServerApiAccess) -class test_ServerApiAccess(object): +class test_ServerApiAccess: def __init__(self, event): print("ServerApiAccess") if event.category == UnauthenticatedAccess: @@ -191,7 +191,7 @@ class test_ServerApiAccess(object): @handler.subscribe(ApiServerPassiveHunterFinished) -class test_PassiveHunterFinished(object): +class test_PassiveHunterFinished: def __init__(self, event): print("PassiveHunterFinished") assert event.namespaces == ["hello"] @@ -276,12 +276,12 @@ def test_AccessApiServerActive(): @handler.subscribe(CreateANamespace) -class test_CreateANamespace(object): +class test_CreateANamespace: def __init__(self, event): assert "abcde" in event.evidence @handler.subscribe(DeleteANamespace) -class test_DeleteANamespace(object): +class test_DeleteANamespace: def __init__(self, event): assert "2019-02-26" in event.evidence diff --git a/tests/hunting/test_certificates.py b/tests/hunting/test_certificates.py index a0a6643..9697545 100644 --- a/tests/hunting/test_certificates.py +++ b/tests/hunting/test_certificates.py @@ -37,6 +37,6 @@ rceJuGsnJEQ= @handler.subscribe(CertificateEmail) -class test_CertificateEmail(object): +class test_CertificateEmail: def __init__(self, event): assert event.email == b"build@nodejs.org0" diff --git a/tests/hunting/test_cvehunting.py b/tests/hunting/test_cvehunting.py index 80fad8e..df5047b 100644 --- a/tests/hunting/test_cvehunting.py +++ b/tests/hunting/test_cvehunting.py @@ -41,7 +41,7 @@ def test_K8sCveHunter(): @handler.subscribe(ServerApiVersionEndPointAccessPE) -class test_CVE_2018_1002105(object): +class test_CVE_2018_1002105: def __init__(self, event): global cve_counter cve_counter += 1 diff --git a/tests/hunting/test_kubelet.py b/tests/hunting/test_kubelet.py index 2d377ba..dcbce44 100644 --- a/tests/hunting/test_kubelet.py +++ b/tests/hunting/test_kubelet.py @@ -270,7 +270,7 @@ def test_proveanonymousauth_connectivity_issues(): @handler.subscribe(ExposedExistingPrivilegedContainersViaSecureKubeletPort) -class ExposedPrivilegedContainersViaAnonymousAuthEnabledInSecureKubeletPortEventCounter(object): +class ExposedPrivilegedContainersViaAnonymousAuthEnabledInSecureKubeletPortEventCounter: def __init__(self, event): global counter counter += 1 @@ -371,9 +371,9 @@ def test_attack_exposed_existing_privileged_container_success(): run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd=" directory_created = "/kube-hunter-mock_" + str(uuid.uuid1()) file_name = "kube-hunter-mock" + str(uuid.uuid1()) - file_name_with_path = "{}/etc/cron.daily/{}".format(directory_created, file_name) + file_name_with_path = f"{directory_created}/etc/cron.daily/{file_name}" - session_mock.post(run_url + urllib.parse.quote("touch {}".format(file_name_with_path), safe=""), text="") + session_mock.post(run_url + urllib.parse.quote(f"touch {file_name_with_path}", safe=""), text="") session_mock.post( run_url + urllib.parse.quote("chmod {} {}".format("755", file_name_with_path), safe=""), text="" ) @@ -395,12 +395,12 @@ def test_attack_exposed_existing_privileged_container_failure_when_touch(): with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock: directory_created = "/kube-hunter-mock_" + str(uuid.uuid1()) file_name = "kube-hunter-mock" + str(uuid.uuid1()) - file_name_with_path = "{}/etc/cron.daily/{}".format(directory_created, file_name) + file_name_with_path = f"{directory_created}/etc/cron.daily/{file_name}" url = "https://localhost:10250/" run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd=" session_mock.post( - run_url + urllib.parse.quote("touch {}".format(file_name_with_path), safe=""), + run_url + urllib.parse.quote(f"touch {file_name_with_path}", safe=""), text="Operation not permitted", ) @@ -420,11 +420,11 @@ def test_attack_exposed_existing_privileged_container_failure_when_chmod(): with requests_mock.Mocker(session=class_being_tested.event.session) as session_mock: directory_created = "/kube-hunter-mock_" + str(uuid.uuid1()) file_name = "kube-hunter-mock" + str(uuid.uuid1()) - file_name_with_path = "{}/etc/cron.daily/{}".format(directory_created, file_name) + file_name_with_path = f"{directory_created}/etc/cron.daily/{file_name}" url = "https://localhost:10250/" run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd=" - session_mock.post(run_url + urllib.parse.quote("touch {}".format(file_name_with_path), safe=""), text="") + session_mock.post(run_url + urllib.parse.quote(f"touch {file_name_with_path}", safe=""), text="") session_mock.post( run_url + urllib.parse.quote("chmod {} {}".format("755", file_name_with_path), safe=""), text="Permission denied", @@ -547,12 +547,12 @@ def test_process_exposed_existing_privileged_container_success(): session_mock.post(run_url + urllib.parse.quote("cat /proc/cmdline", safe=""), text=cat_proc_cmdline) session_mock.post(run_url + urllib.parse.quote("findfs LABEL=Mock", safe=""), text="/dev/mock_fs") - session_mock.post(run_url + urllib.parse.quote("mkdir {}".format(directory_created), safe=""), text="") + session_mock.post(run_url + urllib.parse.quote(f"mkdir {directory_created}", safe=""), text="") session_mock.post( run_url + urllib.parse.quote("mount {} {}".format("/dev/mock_fs", directory_created), safe=""), text="" ) session_mock.post( - run_url + urllib.parse.quote("cat {}/etc/hostname".format(directory_created), safe=""), text="mockhostname" + run_url + urllib.parse.quote(f"cat {directory_created}/etc/hostname", safe=""), text="mockhostname" ) return_value = class_being_tested.process_exposed_existing_privileged_container( @@ -619,9 +619,7 @@ def test_process_exposed_existing_privileged_container_failure_when_mkdir(): session_mock.post(run_url + urllib.parse.quote("cat /proc/cmdline", safe=""), text=cat_proc_cmdline) session_mock.post(run_url + urllib.parse.quote("findfs LABEL=Mock", safe=""), text="/dev/mock_fs") - session_mock.post( - run_url + urllib.parse.quote("mkdir {}".format(directory_created), safe=""), text="Permission denied" - ) + session_mock.post(run_url + urllib.parse.quote(f"mkdir {directory_created}", safe=""), text="Permission denied") return_value = class_being_tested.process_exposed_existing_privileged_container( url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu", @@ -644,7 +642,7 @@ def test_process_exposed_existing_privileged_container_failure_when_mount(): session_mock.post(run_url + urllib.parse.quote("cat /proc/cmdline", safe=""), text=cat_proc_cmdline) session_mock.post(run_url + urllib.parse.quote("findfs LABEL=Mock", safe=""), text="/dev/mock_fs") - session_mock.post(run_url + urllib.parse.quote("mkdir {}".format(directory_created), safe=""), text="") + session_mock.post(run_url + urllib.parse.quote(f"mkdir {directory_created}", safe=""), text="") session_mock.post( run_url + urllib.parse.quote("mount {} {}".format("/dev/mock_fs", directory_created), safe=""), text="Permission denied", @@ -671,12 +669,12 @@ def test_process_exposed_existing_privileged_container_failure_when_cat_hostname session_mock.post(run_url + urllib.parse.quote("cat /proc/cmdline", safe=""), text=cat_proc_cmdline) session_mock.post(run_url + urllib.parse.quote("findfs LABEL=Mock", safe=""), text="/dev/mock_fs") - session_mock.post(run_url + urllib.parse.quote("mkdir {}".format(directory_created), safe=""), text="") + session_mock.post(run_url + urllib.parse.quote(f"mkdir {directory_created}", safe=""), text="") session_mock.post( run_url + urllib.parse.quote("mount {} {}".format("/dev/mock_fs", directory_created), safe=""), text="" ) session_mock.post( - run_url + urllib.parse.quote("cat {}/etc/hostname".format(directory_created), safe=""), + run_url + urllib.parse.quote(f"cat {directory_created}/etc/hostname", safe=""), text="Permission denied", ) @@ -699,18 +697,18 @@ def test_maliciousintentviasecurekubeletport_success(): run_url = url + "run/kube-hunter-privileged/kube-hunter-privileged-deployment-86dc79f945-sjjps/ubuntu?cmd=" directory_created = "/kube-hunter-mock_" + str(uuid.uuid1()) file_name = "kube-hunter-mock" + str(uuid.uuid1()) - file_name_with_path = "{}/etc/cron.daily/{}".format(directory_created, file_name) + file_name_with_path = f"{directory_created}/etc/cron.daily/{file_name}" session_mock.post(run_url + urllib.parse.quote("cat /proc/cmdline", safe=""), text=cat_proc_cmdline) session_mock.post(run_url + urllib.parse.quote("findfs LABEL=Mock", safe=""), text="/dev/mock_fs") - session_mock.post(run_url + urllib.parse.quote("mkdir {}".format(directory_created), safe=""), text="") + session_mock.post(run_url + urllib.parse.quote(f"mkdir {directory_created}", safe=""), text="") session_mock.post( run_url + urllib.parse.quote("mount {} {}".format("/dev/mock_fs", directory_created), safe=""), text="" ) session_mock.post( - run_url + urllib.parse.quote("cat {}/etc/hostname".format(directory_created), safe=""), text="mockhostname" + run_url + urllib.parse.quote(f"cat {directory_created}/etc/hostname", safe=""), text="mockhostname" ) - session_mock.post(run_url + urllib.parse.quote("touch {}".format(file_name_with_path), safe=""), text="") + session_mock.post(run_url + urllib.parse.quote(f"touch {file_name_with_path}", safe=""), text="") session_mock.post( run_url + urllib.parse.quote("chmod {} {}".format("755", file_name_with_path), safe=""), text="" )