Compare commits

...

13 Commits

Author SHA1 Message Date
danielsagi
3df7ea85bc fixed linting issues 2022-05-28 14:51:43 -07:00
danielsagi
2fac662c51 Refactored 'quick' option to 'full-cloud-scan'. now negating this behaviour. default to always scan /24 CIDR 2022-05-28 14:50:26 -07:00
danielsagi
efc6bb69f2 Fixed bug in apiVersions 2022-05-28 14:24:47 -07:00
danielsagi
0119a1cb3f added evidence to azure metadata exposure 2022-05-28 13:50:30 -07:00
danielsagi
8b1e7c7454 fixed typo on extract_subnets in azure scanning 2022-05-28 13:33:17 -07:00
danielsagi
dcfd733cbe fixed IPAddress to < conversion failure 2022-05-28 13:27:29 -07:00
danielsagi
a1b218eb7b removed unessesary prints in apiserver tests 2022-05-28 13:14:52 -07:00
danielsagi
631721ae02 fixed linting issues 2022-05-28 13:11:47 -07:00
danielsagi
f674a99675 added azure subnet extraction as a new hunter AzureSubnetDiscovery. also added specific tests 2022-05-28 12:36:47 -07:00
danielsagi
f0a10c4e65 fixed all azure tests 2022-05-26 08:52:43 -07:00
danielsagi
015c5ae00f WIP: added tests for AKS metadata api discovery 2022-05-26 08:26:34 -07:00
danielsagi
4afbe5d6de WIP: moved registered AKS hunting to multiple subscribe, added Azure version api extraction 2022-05-20 01:38:46 -07:00
Daniel Sagi
5b94375903 initial refacotr, moved all aws logic to seperate module 2022-05-19 18:04:08 +03:00
21 changed files with 590 additions and 456 deletions

View File

@@ -40,7 +40,7 @@ Table of Contents
- [Output](#output) - [Output](#output)
- [Dispatching](#dispatching) - [Dispatching](#dispatching)
- [Advanced Usage](#advanced-usage) - [Advanced Usage](#advanced-usage)
- [Azure Quick Scanning](#azure-quick-scanning) - [Full Cloud Scan](#full-cloud-scan)
- [Custom Hunting](#custom-hunting) - [Custom Hunting](#custom-hunting)
- [Deployment](#deployment) - [Deployment](#deployment)
- [On Machine](#on-machine) - [On Machine](#on-machine)
@@ -159,9 +159,9 @@ Available dispatch methods are:
## Advanced Usage ## Advanced Usage
### Azure Quick Scanning ### Full Cloud Scan
When running **as a Pod in an Azure or AWS environment**, kube-hunter will fetch subnets from the Instance Metadata Service. Naturally this makes the discovery process take longer. When running **as a Pod in an Azure or AWS environment**, kube-hunter will fetch subnets from the Instance Metadata Service. Naturally this will make the discovery process take longer. This is why by default kube-hunter will hardlimit `/24` CIDR.
To hardlimit subnet scanning to a `/24` CIDR, use the `--quick` option. If you wish to remove this limit and scan whatever subnet kube-hunter discover you can use the `--full-cloud-scan`
### Custom Hunting ### Custom Hunting
Custom hunting enables advanced users to have control over what hunters gets registered at the start of a hunt. Custom hunting enables advanced users to have control over what hunters gets registered at the start of a hunt.
@@ -183,7 +183,6 @@ kube-hunter --active --list --raw-hunter-names
**Notice**: Due to kube-huner's architectural design, the following "Core Hunters/Classes" will always register (even when using custom hunting): **Notice**: Due to kube-huner's architectural design, the following "Core Hunters/Classes" will always register (even when using custom hunting):
* HostDiscovery * HostDiscovery
* _Generates ip addresses for the hunt by given configurations_ * _Generates ip addresses for the hunt by given configurations_
* _Automatically discovers subnets using cloud Metadata APIs_
* FromPodHostDiscovery * FromPodHostDiscovery
* _Auto discover attack surface ip addresses for the hunt by using Pod based environment techniques_ * _Auto discover attack surface ip addresses for the hunt by using Pod based environment techniques_
* _Automatically discovers subnets using cloud Metadata APIs_ * _Automatically discovers subnets using cloud Metadata APIs_

View File

@@ -1 +1 @@
kube_hunter/__main__.py kube_hunter / __main__.py

View File

@@ -24,7 +24,7 @@ config = Config(
network_timeout=args.network_timeout, network_timeout=args.network_timeout,
num_worker_threads=args.num_worker_threads, num_worker_threads=args.num_worker_threads,
pod=args.pod, pod=args.pod,
quick=args.quick, full_cloud_scan=args.full_cloud_scan,
remote=args.remote, remote=args.remote,
statistics=args.statistics, statistics=args.statistics,
k8s_auto_discover_nodes=args.k8s_auto_discover_nodes, k8s_auto_discover_nodes=args.k8s_auto_discover_nodes,

View File

@@ -22,7 +22,7 @@ class Config:
- network_timeout: Timeout for network operations - network_timeout: Timeout for network operations
- num_worker_threads: Add a flag --threads to change the default 800 thread count of the event handler - num_worker_threads: Add a flag --threads to change the default 800 thread count of the event handler
- pod: From pod scanning mode - pod: From pod scanning mode
- quick: Quick scanning mode - full_cloud_scan: disables limit for 24 cidr in cloud envs
- remote: Hosts to scan - remote: Hosts to scan
- report: Output format - report: Output format
- statistics: Include hunters statistics - statistics: Include hunters statistics
@@ -39,7 +39,7 @@ class Config:
network_timeout: float = 5.0 network_timeout: float = 5.0
num_worker_threads: int = 800 num_worker_threads: int = 800
pod: bool = False pod: bool = False
quick: bool = False full_cloud_scan: bool = False
remote: Optional[str] = None remote: Optional[str] = None
reporter: Optional[Any] = None reporter: Optional[Any] = None
statistics: bool = False statistics: bool = False

View File

@@ -18,7 +18,11 @@ def parser_add_arguments(parser):
parser.add_argument("--pod", action="store_true", help="Set hunter as an insider pod") parser.add_argument("--pod", action="store_true", help="Set hunter as an insider pod")
parser.add_argument("--quick", action="store_true", help="Prefer quick scan (subnet 24)") parser.add_argument(
"--full-cloud-scan",
action="store_true",
help="Disable hardlimit of '/24' CIDR when scraping Instance Metadata API",
)
parser.add_argument( parser.add_argument(
"--include-patched-versions", "--include-patched-versions",

View File

@@ -1,6 +1,5 @@
import logging import logging
import threading import threading
import requests
from kube_hunter.conf import get_config from kube_hunter.conf import get_config
from kube_hunter.core.types import KubernetesCluster from kube_hunter.core.types import KubernetesCluster
@@ -180,34 +179,11 @@ class NewHostEvent(Event):
def __init__(self, host, cloud=None): def __init__(self, host, cloud=None):
global event_id_count global event_id_count
self.host = host self.host = host
self.cloud_type = cloud
with event_id_count_lock: with event_id_count_lock:
self.event_id = event_id_count self.event_id = event_id_count
event_id_count += 1 event_id_count += 1
@property
def cloud(self):
if not self.cloud_type:
self.cloud_type = self.get_cloud()
return self.cloud_type
def get_cloud(self):
config = get_config()
try:
logger.debug("Checking whether the cluster is deployed on azure's cloud")
# Leverage 3rd tool https://github.com/blrchen/AzureSpeed for Azure cloud ip detection
result = requests.get(
f"https://api.azurespeed.com/api/region?ipOrUrl={self.host}",
timeout=config.network_timeout,
).json()
return result["cloud"] or "NoCloud"
except requests.ConnectionError:
logger.info("Failed to connect cloud type service", exc_info=True)
except Exception:
logger.warning(f"Unable to check cloud of {self.host}", exc_info=True)
return "NoCloud"
def __str__(self): def __str__(self):
return str(self.host) return str(self.host)

View File

@@ -4,6 +4,10 @@ class KubernetesCluster:
name = "Kubernetes Cluster" name = "Kubernetes Cluster"
class CloudProvider:
name = "Cloud Provider"
class KubectlClient: class KubectlClient:
"""The kubectl client binary is used by the user to interact with the cluster""" """The kubectl client binary is used by the user to interact with the cluster"""
@@ -16,13 +20,19 @@ class Kubelet(KubernetesCluster):
name = "Kubelet" name = "Kubelet"
class AWS(KubernetesCluster): class BareMetal(CloudProvider):
"""AWS Cluster""" """AWS Cluster"""
name = "AWS" name = "Bare Metal Installation"
class Azure(KubernetesCluster): class Azure(CloudProvider):
"""Azure Cluster""" """Azure Cluster"""
name = "Azure" name = "AKS Cluster"
class AWS(CloudProvider):
"""Azure Cluster"""
name = "EKS Cluster"

View File

@@ -9,3 +9,5 @@ from . import (
ports, ports,
proxy, proxy,
) )
from .cloud import aws, azure

View File

@@ -0,0 +1,177 @@
import logging
import requests
import ipaddress
from kube_hunter.conf import get_config
from kube_hunter.core.events.event_handler import handler
from kube_hunter.core.types import Discovery, AWS
from kube_hunter.core.events.types import Event, Vulnerability, NewHostEvent
from kube_hunter.core.types.vulnerabilities import InstanceMetadataApiTechnique
from kube_hunter.modules.discovery.hosts import RunningAsPodEvent
logger = logging.getLogger(__name__)
class AWSMetadataApiExposed(Vulnerability, Event):
"""Access to the AWS Metadata API exposes information about the machines associated with the cluster"""
def __init__(self, version):
Vulnerability.__init__(
self,
AWS,
"AWS Metadata Exposure",
category=InstanceMetadataApiTechnique,
vid="KHV053",
)
self.version = version
class InstanceMetadataApi:
URL = "http://169.254.169.254/latest/meta-data/"
GET_MACS_URL = "http://169.254.169.254/latest/meta-data/mac"
LIST_CIDR_URL = (
"http://169.254.169.254/latest/meta-data/network/interfaces/macs/{mac_address}/subnet-ipv4-cidr-block"
)
V2_REQUEST_TOKEN_URL = "http://169.254.169.254/latest/api/token/"
V2_REQUEST_TOKEN_HEADER = {"X-aws-ec2-metatadata-token-ttl-seconds": "21600"}
V2_TOKEN_HEADER_NAME = "X-aws-ec2-metatadata-token"
@classmethod
def get_api_token(cls, network_timeout):
return requests.put(
cls.V2_REQUEST_TOKEN_URL,
headers=cls.V2_REQUEST_TOKEN_HEADER,
timeout=network_timeout,
).text
@classmethod
def ping_v1(cls, network_timeout):
status = requests.get(cls.URL, timeout=network_timeout).status_code
return status == requests.codes.OK
@classmethod
def ping_v2(cls, token, network_timeout):
status = requests.get(
cls.URL,
headers={cls.V2_TOKEN_HEADER_NAME: token},
timeout=network_timeout,
).status_code
return status == requests.codes.OK
@handler.subscribe(RunningAsPodEvent)
class AWSMetadataAPIDiscovery(Discovery):
"""AWS Metadata API Discovery
Pings all metadata api versions and determines if they are accessible from the Pod
"""
def __init__(self, event):
self.event = event
def execute(self):
config = get_config()
if self.check_metadata_v1(config.network_timeout):
self.publish_event(AWSMetadataApiExposed(version="1"))
if self.check_metadata_v2(config.network_timeout):
self.publish_event(AWSMetadataApiExposed(version="2"))
def check_metadata_v1(self, network_timeout):
"""Method checks if the metadata version v1 service is up and accessible from the pod"""
try:
logger.debug("From pod attempting to access AWS Metadata v1 API")
return InstanceMetadataApi.ping_v1(network_timeout)
except requests.exceptions.ConnectionError:
logger.debug("Failed to connect to AWS metadata server v1")
except Exception:
logger.debug("Unknown error when trying to connect to AWS metadata v1 API")
return False
def check_metadata_v2(self, network_timeout):
"""Method checks if the metadata version v2 service is up and accessible from the pod"""
try:
logger.debug("From pod attempting to access AWS Metadata v2 API")
token = InstanceMetadataApi.get_api_token()
return InstanceMetadataApi.ping_v2(token, network_timeout)
except requests.exceptions.ConnectionError:
logger.debug("Failed to connect AWS metadata server v2")
except Exception:
logger.debug("Unknown error when trying to connect to AWS metadata v2 API")
return False
@handler.subscribe(AWSMetadataApiExposed)
class AWSMetadataHostsDiscovery(Discovery):
"""AWS Metadata Hosts Discovery
Scrapes the metadata api for additional accessible network subnets for kube-hunter to scan
"""
def __init__(self, event):
self.event = event
def execute(self):
config = get_config()
# Extracting network subnet from metadata api
if self.event.version == "1":
network = self.extract_network_subnet_v1(config.network_timeout)
elif self.event.version == "2":
network = self.extract_network_subnet_v2(config.network_timeout)
# If full scan is enabled we ignore the prefix and only use the network address
if network:
if not config.full_cloud_scan:
# Fallback to 24 default subnet
network = ipaddress.IPv4Network(f"{network.network_address}/{24}")
for ip in network:
self.publish_event(NewHostEvent(host=ip))
def extract_network_subnet_v1(self, network_timeout):
"""Extract network subnet from aws metadata api v1"""
logger.debug("From pod attempting to access aws's metadata v1")
mac_address = requests.get(InstanceMetadataApi.GET_MACS_URL, timeout=network_timeout).text
logger.debug(f"Extracted mac from aws's metadata v1: {mac_address}")
cidr_get_url = InstanceMetadataApi.LIST_CIDR_URL.format(mac_address=mac_address)
cidr = requests.get(cidr_get_url, timeout=network_timeout).text
logger.debug(f"Extracted cidr block from aws's metadata v1: {cidr}")
try:
network = ipaddress.IPv4Network(cidr.strip())
return network
except Exception as x:
logger.debug(f"ERROR: could not parse cidr from aws metadata api: {cidr} - {x}")
return None
def extract_network_subnet_v2(self, network_timeout):
"""Extract network subnet from aws metadata api v1"""
logger.debug("From pod attempting to access aws's metadata v2")
token = InstanceMetadataApi.get_api_token()
mac_address = requests.get(
InstanceMetadataApi.GET_MACS_URL,
headers={InstanceMetadataApi.V2_TOKEN_HEADER_NAME: token},
timeout=network_timeout,
).text
cidr_get_url = InstanceMetadataApi.LIST_CIDR_URL.format(mac_address=mac_address)
cidr = requests.get(
cidr_get_url,
headers={InstanceMetadataApi.V2_TOKEN_HEADER_NAME: token},
timeout=network_timeout,
).text.split("/")
try:
network = ipaddress.IPv4Network(cidr.strip())
return network
except Exception as x:
logger.debug(f"ERROR: could not parse cidr from aws metadata api: {cidr} - {x}")
return None

View File

@@ -0,0 +1,130 @@
import logging
import requests
from kube_hunter.conf import get_config
from kube_hunter.core.types import Discovery
from kube_hunter.core.types.components import Azure
from kube_hunter.core.events.types import Vulnerability, Event, InstanceMetadataApiTechnique
from kube_hunter.core.events.event_handler import handler
from kube_hunter.modules.discovery.hosts import RunningAsPodEvent, NewHostEvent
from ipaddress import IPv4Network
logger = logging.getLogger(__name__)
class AzureMetadataApiExposed(Vulnerability, Event):
"""Access to the Azure Metadata API exposes information about the machines associated with the cluster"""
def __init__(self, versions_info):
Vulnerability.__init__(
self,
Azure,
"Azure Metadata Exposure",
category=InstanceMetadataApiTechnique,
vid="KHV003",
)
# dict containing all api versions instance api extracted
self.versions_info = versions_info
self.evidence = f"apiVersions: {','.join(self.versions_info.keys())}"
class AzureInstanceMetadataService:
ROOT = "http://169.254.169.254/metadata/"
VERSIONS_ENDPOINT = "versions"
INSTANCE_ENDPOINT = "instance"
VERSION_PARAMETER = "api-version"
REQUEST_TOKEN_HEADER = {"Metadata": "true"}
@classmethod
def get_versions(cls, network_timeout):
try:
return requests.get(
cls.ROOT + cls.VERSIONS_ENDPOINT,
headers=cls.REQUEST_TOKEN_HEADER,
timeout=network_timeout,
).json()
except requests.exceptions.ConnectionError:
logger.debug("Failed to connect Azure metadata server")
except Exception:
logger.debug("Unknown error when trying to connect to Azure metadata server")
return False
@classmethod
def get_instance_data(cls, api_version, network_timeout):
try:
return requests.get(
cls.ROOT + cls.INSTANCE_ENDPOINT,
params={cls.VERSION_PARAMETER: api_version},
headers=cls.REQUEST_TOKEN_HEADER,
timeout=network_timeout,
).json()
except requests.exceptions.ConnectionError:
logger.debug("Failed to connect Azure metadata server")
except Exception:
logger.debug("Unknown error when trying to connect to Azure metadata server")
return False
@handler.subscribe(RunningAsPodEvent)
class AzureInstanceMetadataServiceDiscovery(Discovery):
def __init__(self, event):
self.event = event
def execute(self):
config = get_config()
logger.debug("Trying to access IMDS (Azure Metadata Service) from pod")
available_versions = AzureInstanceMetadataService.get_versions(network_timeout=config.network_timeout)
if not available_versions:
logger.debug("IMDS not available")
return
versions_info = dict()
for version in available_versions["apiVersions"]:
instance_data = AzureInstanceMetadataService.get_instance_data(
api_version=version, network_timeout=config.network_timeout
)
if instance_data:
logger.debug(f"Successfully extracted IMDS apiVersion {version} instance data")
versions_info[version] = instance_data
self.publish_event(AzureMetadataApiExposed(versions_info=versions_info))
@handler.subscribe(AzureMetadataApiExposed)
class AzureSubnetsDiscovery(Discovery):
def __init__(self, event):
self.event = event
def extract_azure_subnet(self):
# default to 24 subnet
address, prefix = None, "24"
config = get_config()
for version, info in self.event.versions_info.items():
try:
address = info["network"]["interface"][0]["ipv4"]["subnet"][0]["address"]
tmp_prefix = info["network"]["interface"][0]["ipv4"]["subnet"][0]["prefix"]
if not config.full_cloud_scan:
logger.debug(
f"Discovered azure subnet {tmp_prefix} but scanning {prefix} due to `full_cloud_scan=False` option "
)
else:
prefix = tmp_prefix
return f"{address}/{prefix}"
except Exception as x:
logger.debug(f"Skipping azure subnet discovery for version {version}: {x}")
continue
return False
def execute(self):
subnet = self.extract_azure_subnet()
if subnet:
logger.debug(f"From pod discovered azure subnet {subnet}")
for ip in IPv4Network(f"{subnet}"):
self.publish_event(NewHostEvent(str(ip)))

View File

@@ -1,17 +1,16 @@
import os import os
import logging import logging
import itertools import itertools
import requests
from enum import Enum from enum import Enum
from netaddr import IPNetwork, IPAddress, AddrFormatError from netaddr import IPNetwork, AddrFormatError
from netifaces import AF_INET, ifaddresses, interfaces, gateways from netifaces import AF_INET, ifaddresses, interfaces, gateways
from kube_hunter.conf import get_config from kube_hunter.conf import get_config
from kube_hunter.modules.discovery.kubernetes_client import list_all_k8s_cluster_nodes from kube_hunter.modules.discovery.kubernetes_client import list_all_k8s_cluster_nodes
from kube_hunter.core.types import Discovery
from kube_hunter.core.events.event_handler import handler from kube_hunter.core.events.event_handler import handler
from kube_hunter.core.events.types import Event, NewHostEvent, Vulnerability from kube_hunter.core.events.types import Event, NewHostEvent
from kube_hunter.core.types import Discovery, AWS, Azure, InstanceMetadataApiTechnique
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -47,36 +46,6 @@ class RunningAsPodEvent(Event):
pass pass
class AWSMetadataApi(Vulnerability, Event):
"""Access to the AWS Metadata API exposes information about the machines associated with the cluster"""
def __init__(self, cidr):
Vulnerability.__init__(
self,
AWS,
"AWS Metadata Exposure",
category=InstanceMetadataApiTechnique,
vid="KHV053",
)
self.cidr = cidr
self.evidence = f"cidr: {cidr}"
class AzureMetadataApi(Vulnerability, Event):
"""Access to the Azure Metadata API exposes information about the machines associated with the cluster"""
def __init__(self, cidr):
Vulnerability.__init__(
self,
Azure,
"Azure Metadata Exposure",
category=InstanceMetadataApiTechnique,
vid="KHV003",
)
self.cidr = cidr
self.evidence = f"cidr: {cidr}"
class HostScanEvent(Event): class HostScanEvent(Event):
def __init__(self, pod=False, active=False, predefined_hosts=None): def __init__(self, pod=False, active=False, predefined_hosts=None):
# flag to specify whether to get actual data from vulnerabilities # flag to specify whether to get actual data from vulnerabilities
@@ -128,16 +97,7 @@ class FromPodHostDiscovery(Discovery):
if config.remote or config.cidr: if config.remote or config.cidr:
self.publish_event(HostScanEvent()) self.publish_event(HostScanEvent())
else: else:
# Discover cluster subnets, we'll scan all these hosts subnets = self.gateway_discovery()
cloud, subnets = None, list()
if self.is_azure_pod():
subnets, cloud = self.azure_metadata_discovery()
elif self.is_aws_pod_v1():
subnets, cloud = self.aws_metadata_v1_discovery()
elif self.is_aws_pod_v2():
subnets, cloud = self.aws_metadata_v2_discovery()
subnets += self.gateway_discovery()
should_scan_apiserver = False should_scan_apiserver = False
if self.event.kubeservicehost: if self.event.kubeservicehost:
@@ -147,166 +107,15 @@ class FromPodHostDiscovery(Discovery):
should_scan_apiserver = False should_scan_apiserver = False
logger.debug(f"From pod scanning subnet {ip}/{mask}") logger.debug(f"From pod scanning subnet {ip}/{mask}")
for ip in IPNetwork(f"{ip}/{mask}"): for ip in IPNetwork(f"{ip}/{mask}"):
self.publish_event(NewHostEvent(host=ip, cloud=cloud)) self.publish_event(NewHostEvent(host=ip))
if should_scan_apiserver: if should_scan_apiserver:
self.publish_event(NewHostEvent(host=IPAddress(self.event.kubeservicehost), cloud=cloud)) self.publish_event(NewHostEvent(host=self.event.kubeservicehost))
def is_aws_pod_v1(self):
config = get_config()
try:
# Instance Metadata Service v1
logger.debug("From pod attempting to access AWS Metadata v1 API")
if (
requests.get(
"http://169.254.169.254/latest/meta-data/",
timeout=config.network_timeout,
).status_code
== 200
):
return True
except requests.exceptions.ConnectionError:
logger.debug("Failed to connect AWS metadata server v1")
except Exception:
logger.debug("Unknown error when trying to connect to AWS metadata v1 API")
return False
def is_aws_pod_v2(self):
config = get_config()
try:
# Instance Metadata Service v2
logger.debug("From pod attempting to access AWS Metadata v2 API")
token = requests.put(
"http://169.254.169.254/latest/api/token/",
headers={"X-aws-ec2-metatadata-token-ttl-seconds": "21600"},
timeout=config.network_timeout,
).text
if (
requests.get(
"http://169.254.169.254/latest/meta-data/",
headers={"X-aws-ec2-metatadata-token": token},
timeout=config.network_timeout,
).status_code
== 200
):
return True
except requests.exceptions.ConnectionError:
logger.debug("Failed to connect AWS metadata server v2")
except Exception:
logger.debug("Unknown error when trying to connect to AWS metadata v2 API")
return False
def is_azure_pod(self):
config = get_config()
try:
logger.debug("From pod attempting to access Azure Metadata API")
if (
requests.get(
"http://169.254.169.254/metadata/instance?api-version=2017-08-01",
headers={"Metadata": "true"},
timeout=config.network_timeout,
).status_code
== 200
):
return True
except requests.exceptions.ConnectionError:
logger.debug("Failed to connect Azure metadata server")
except Exception:
logger.debug("Unknown error when trying to connect to Azure metadata server")
return False
# for pod scanning # for pod scanning
def gateway_discovery(self): def gateway_discovery(self):
"""Retrieving default gateway of pod, which is usually also a contact point with the host""" """Retrieving default gateway of pod, which is usually also a contact point with the host"""
return [[gateways()["default"][AF_INET][0], "24"]] return [[gateways()["default"][AF_INET][0], "24"]]
# querying AWS's interface metadata api v1 | works only from a pod
def aws_metadata_v1_discovery(self):
config = get_config()
logger.debug("From pod attempting to access aws's metadata v1")
mac_address = requests.get(
"http://169.254.169.254/latest/meta-data/mac",
timeout=config.network_timeout,
).text
logger.debug(f"Extracted mac from aws's metadata v1: {mac_address}")
cidr = requests.get(
f"http://169.254.169.254/latest/meta-data/network/interfaces/macs/{mac_address}/subnet-ipv4-cidr-block",
timeout=config.network_timeout,
).text
logger.debug(f"Trying to extract cidr from aws's metadata v1: {cidr}")
try:
cidr = cidr.split("/")
address, subnet = (cidr[0], cidr[1])
subnet = subnet if not config.quick else "24"
cidr = f"{address}/{subnet}"
logger.debug(f"From pod discovered subnet {cidr}")
self.publish_event(AWSMetadataApi(cidr=cidr))
return [(address, subnet)], "AWS"
except Exception as x:
logger.debug(f"ERROR: could not parse cidr from aws metadata api: {cidr} - {x}")
return [], "AWS"
# querying AWS's interface metadata api v2 | works only from a pod
def aws_metadata_v2_discovery(self):
config = get_config()
logger.debug("From pod attempting to access aws's metadata v2")
token = requests.get(
"http://169.254.169.254/latest/api/token",
headers={"X-aws-ec2-metatadata-token-ttl-seconds": "21600"},
timeout=config.network_timeout,
).text
mac_address = requests.get(
"http://169.254.169.254/latest/meta-data/mac",
headers={"X-aws-ec2-metatadata-token": token},
timeout=config.network_timeout,
).text
cidr = requests.get(
f"http://169.254.169.254/latest/meta-data/network/interfaces/macs/{mac_address}/subnet-ipv4-cidr-block",
headers={"X-aws-ec2-metatadata-token": token},
timeout=config.network_timeout,
).text.split("/")
try:
address, subnet = (cidr[0], cidr[1])
subnet = subnet if not config.quick else "24"
cidr = f"{address}/{subnet}"
logger.debug(f"From pod discovered subnet {cidr}")
self.publish_event(AWSMetadataApi(cidr=cidr))
return [(address, subnet)], "AWS"
except Exception as x:
logger.debug(f"ERROR: could not parse cidr from aws metadata api: {cidr} - {x}")
return [], "AWS"
# querying azure's interface metadata api | works only from a pod
def azure_metadata_discovery(self):
config = get_config()
logger.debug("From pod attempting to access azure's metadata")
machine_metadata = requests.get(
"http://169.254.169.254/metadata/instance?api-version=2017-08-01",
headers={"Metadata": "true"},
timeout=config.network_timeout,
).json()
address, subnet = "", ""
subnets = list()
for interface in machine_metadata["network"]["interface"]:
address, subnet = (
interface["ipv4"]["subnet"][0]["address"],
interface["ipv4"]["subnet"][0]["prefix"],
)
subnet = subnet if not config.quick else "24"
logger.debug(f"From pod discovered subnet {address}/{subnet}")
subnets.append([address, subnet if not config.quick else "24"])
self.publish_event(AzureMetadataApi(cidr=f"{address}/{subnet}"))
return subnets, "Azure"
@handler.subscribe(HostScanEvent) @handler.subscribe(HostScanEvent)
class HostDiscovery(Discovery): class HostDiscovery(Discovery):

View File

@@ -4,11 +4,13 @@ import logging
import requests import requests
from kube_hunter.conf import get_config from kube_hunter.conf import get_config
from kube_hunter.modules.hunting.kubelet import ExposedPodsHandler, SecureKubeletPortHunter from kube_hunter.modules.hunting.kubelet import ExposedPodsHandler, ExposedRunHandler
from kube_hunter.core.events.event_handler import handler from kube_hunter.core.events.event_handler import handler
from kube_hunter.core.events.types import Event, Vulnerability from kube_hunter.core.events.types import Event, Vulnerability
from kube_hunter.core.types import Hunter, ActiveHunter, MountServicePrincipalTechnique, Azure from kube_hunter.core.types import Hunter, ActiveHunter, MountServicePrincipalTechnique, Azure
from kube_hunter.modules.discovery.cloud.azure import AzureMetadataApiExposed
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -27,14 +29,14 @@ class AzureSpnExposure(Vulnerability, Event):
self.evidence = evidence self.evidence = evidence
@handler.subscribe(ExposedPodsHandler, predicate=lambda x: x.cloud_type == "Azure") @handler.subscribe_many([ExposedPodsHandler, AzureMetadataApiExposed])
class AzureSpnHunter(Hunter): class AzureSpnHunter(Hunter):
"""AKS Hunting """AKS Hunting
Hunting Azure cluster deployments using specific known configurations Hunting Azure cluster deployments using specific known configurations
""" """
def __init__(self, event): def __init__(self, event):
self.event = event self.event = event.get_by_class(ExposedPodsHandler)
self.base_url = f"https://{self.event.host}:{self.event.port}" self.base_url = f"https://{self.event.host}:{self.event.port}"
# getting a container that has access to the azure.json file # getting a container that has access to the azure.json file
@@ -68,28 +70,25 @@ class AzureSpnHunter(Hunter):
self.publish_event(AzureSpnExposure(container=container, evidence=evidence)) self.publish_event(AzureSpnExposure(container=container, evidence=evidence))
@handler.subscribe(AzureSpnExposure) @handler.subscribe_many([AzureSpnExposure, ExposedRunHandler])
class ProveAzureSpnExposure(ActiveHunter): class ProveAzureSpnExposure(ActiveHunter):
"""Azure SPN Hunter """Azure SPN Hunter
Gets the azure subscription file on the host by executing inside a container Gets the azure subscription file on the host by executing inside a container
""" """
def __init__(self, event): def __init__(self, events):
self.event = event self.events = events
self.base_url = f"https://{self.event.host}:{self.event.port}" self.exposed_run_event = self.events.get_by_class(ExposedRunHandler)
self.spn_exposure_event = self.events.get_by_class(AzureSpnExposure)
def test_run_capability(self): self.base_url = f"https://{self.event.host}:{self.event.port}"
"""
Uses SecureKubeletPortHunter to test the /run handler
TODO: when multiple event subscription is implemented, use this here to make sure /run is accessible
"""
debug_handlers = SecureKubeletPortHunter.DebugHandlers(path=self.base_url, session=self.event.session, pod=None)
return debug_handlers.test_run_container()
def run(self, command, container): def run(self, command, container):
config = get_config() config = get_config()
run_url = f"{self.base_url}/run/{container['namespace']}/{container['pod']}/{container['name']}" run_url = f"{self.base_url}/run/{container['namespace']}/{container['pod']}/{container['name']}"
return self.event.session.post(run_url, verify=False, params={"cmd": command}, timeout=config.network_timeout) return self.exposed_run_event.session.post(
run_url, verify=False, params={"cmd": command}, timeout=config.network_timeout
)
def get_full_path_to_azure_file(self): def get_full_path_to_azure_file(self):
""" """
@@ -106,10 +105,6 @@ class ProveAzureSpnExposure(ActiveHunter):
return azure_file_path return azure_file_path
def execute(self): def execute(self):
if not self.test_run_capability():
logger.debug("Not proving AzureSpnExposure because /run debug handler is disabled")
return
try: try:
azure_file_path = self.get_full_path_to_azure_file() azure_file_path = self.get_full_path_to_azure_file()
logger.debug(f"trying to access the azure.json at the resolved path: {azure_file_path}") logger.debug(f"trying to access the azure.json at the resolved path: {azure_file_path}")
@@ -120,8 +115,8 @@ class ProveAzureSpnExposure(ActiveHunter):
logger.warning("failed to parse SPN") logger.warning("failed to parse SPN")
else: else:
if "subscriptionId" in subscription: if "subscriptionId" in subscription:
self.event.subscriptionId = subscription["subscriptionId"] self.spn_exposure_event.subscriptionId = subscription["subscriptionId"]
self.event.aadClientId = subscription["aadClientId"] self.spn_exposure_event.aadClientId = subscription["aadClientId"]
self.event.aadClientSecret = subscription["aadClientSecret"] self.spn_exposure_event.aadClientSecret = subscription["aadClientSecret"]
self.event.tenantId = subscription["tenantId"] self.spn_exposure_event.tenantId = subscription["tenantId"]
self.event.evidence = f"subscription: {self.event.subscriptionId}" self.spn_exposure_event.evidence = f"subscription: {self.event.subscriptionId}"

View File

@@ -1,29 +0,0 @@
# flake8: noqa: E402
import requests_mock
import json
from kube_hunter.conf import Config, set_config
set_config(Config())
from kube_hunter.core.events.types import NewHostEvent
def test_presetcloud():
"""Testing if it doesn't try to run get_cloud if the cloud type is already set.
get_cloud(1.2.3.4) will result with an error
"""
expcted = "AWS"
hostEvent = NewHostEvent(host="1.2.3.4", cloud=expcted)
assert expcted == hostEvent.cloud
def test_getcloud():
fake_host = "1.2.3.4"
expected_cloud = "Azure"
result = {"cloud": expected_cloud}
with requests_mock.mock() as m:
m.get(f"https://api.azurespeed.com/api/region?ipOrUrl={fake_host}", text=json.dumps(result))
hostEvent = NewHostEvent(host=fake_host)
assert hostEvent.cloud == expected_cloud

View File

@@ -13,6 +13,8 @@ from kube_hunter.modules.discovery.kubectl import KubectlClientDiscovery
from kube_hunter.modules.discovery.kubelet import KubeletDiscovery from kube_hunter.modules.discovery.kubelet import KubeletDiscovery
from kube_hunter.modules.discovery.ports import PortDiscovery from kube_hunter.modules.discovery.ports import PortDiscovery
from kube_hunter.modules.discovery.proxy import KubeProxy as KubeProxyDiscovery from kube_hunter.modules.discovery.proxy import KubeProxy as KubeProxyDiscovery
from kube_hunter.modules.discovery.cloud.azure import AzureInstanceMetadataServiceDiscovery, AzureSubnetsDiscovery
from kube_hunter.modules.discovery.cloud.aws import AWSMetadataAPIDiscovery, AWSMetadataHostsDiscovery
from kube_hunter.modules.hunting.aks import AzureSpnHunter, ProveAzureSpnExposure from kube_hunter.modules.hunting.aks import AzureSpnHunter, ProveAzureSpnExposure
from kube_hunter.modules.hunting.apiserver import ( from kube_hunter.modules.hunting.apiserver import (
AccessApiServer, AccessApiServer,
@@ -66,6 +68,10 @@ PASSIVE_HUNTERS = {
VarLogMountHunter, VarLogMountHunter,
KubeProxy, KubeProxy,
AccessSecrets, AccessSecrets,
AzureInstanceMetadataServiceDiscovery,
AzureSubnetsDiscovery,
AWSMetadataAPIDiscovery,
AWSMetadataHostsDiscovery,
} }
# if config.enable_cve_hunting: # if config.enable_cve_hunting:

View File

@@ -125,7 +125,6 @@ def test_InsecureApiServer():
@handler.subscribe(ApiServer) @handler.subscribe(ApiServer)
class testApiServer: class testApiServer:
def __init__(self, event): def __init__(self, event):
print("Event")
assert event.host == "mockKubernetes" assert event.host == "mockKubernetes"
global counter global counter
counter += 1 counter += 1

111
tests/discovery/test_aws.py Normal file
View File

@@ -0,0 +1,111 @@
# import requests_mock
# class TestAwsMetadataApiDiscovery:
# @staticmethod
# def _make_aws_response(*data: List[str]) -> str:
# return "\n".join(data)
# def test_is_aws_pod_v1_success(self):
# f = FromPodHostDiscovery(RunningAsPodEvent())
# with requests_mock.Mocker() as m:
# m.get(
# "http://169.254.169.254/latest/meta-data/",
# text=TestFromPodHostDiscovery._make_aws_response(
# "\n".join(
# (
# "ami-id",
# "ami-launch-index",
# "ami-manifest-path",
# "block-device-mapping/",
# "events/",
# "hostname",
# "iam/",
# "instance-action",
# "instance-id",
# "instance-type",
# "local-hostname",
# "local-ipv4",
# "mac",
# "metrics/",
# "network/",
# "placement/",
# "profile",
# "public-hostname",
# "public-ipv4",
# "public-keys/",
# "reservation-id",
# "security-groups",
# "services/",
# )
# ),
# ),
# )
# result = f.is_aws_pod_v1()
# assert result
# def test_is_aws_pod_v2_request_fail(self):
# f = FromPodHostDiscovery(RunningAsPodEvent())
# with requests_mock.Mocker() as m:
# m.put(
# "http://169.254.169.254/latest/api/token/",
# headers={"X-aws-ec2-metatadata-token-ttl-seconds": "21600"},
# status_code=404,
# )
# m.get(
# "http://169.254.169.254/latest/meta-data/",
# headers={"X-aws-ec2-metatadata-token": "token"},
# status_code=404,
# )
# result = f.is_aws_pod_v2()
# assert not result
# def test_is_aws_pod_v2_success(self):
# f = FromPodHostDiscovery(RunningAsPodEvent())
# with requests_mock.Mocker() as m:
# m.put(
# "http://169.254.169.254/latest/api/token/",
# headers={"X-aws-ec2-metatadata-token-ttl-seconds": "21600"},
# text=TestFromPodHostDiscovery._make_aws_response("token"),
# )
# m.get(
# "http://169.254.169.254/latest/meta-data/",
# headers={"X-aws-ec2-metatadata-token": "token"},
# text=TestFromPodHostDiscovery._make_aws_response(
# "\n".join(
# (
# "ami-id",
# "ami-launch-index",
# "ami-manifest-path",
# "block-device-mapping/",
# "events/",
# "hostname",
# "iam/",
# "instance-action",
# "instance-id",
# "instance-type",
# "local-hostname",
# "local-ipv4",
# "mac",
# "metrics/",
# "network/",
# "placement/",
# "profile",
# "public-hostname",
# "public-ipv4",
# "public-keys/",
# "reservation-id",
# "security-groups",
# "services/",
# )
# ),
# ),
# )
# result = f.is_aws_pod_v2()
# assert result

View File

@@ -0,0 +1,79 @@
import json
import time
import requests_mock
from kube_hunter.core.events.event_handler import handler
from kube_hunter.modules.discovery.hosts import RunningAsPodEvent
from kube_hunter.modules.discovery.cloud.azure import (
AzureInstanceMetadataServiceDiscovery,
AzureMetadataApiExposed,
AzureSubnetsDiscovery,
)
event_counter = 0
def test_TestAzureMetadataApi():
global event_counter
f = AzureInstanceMetadataServiceDiscovery(RunningAsPodEvent())
with requests_mock.Mocker() as m:
m.get("http://169.254.169.254/metadata/versions", status_code=404)
f.execute()
# We expect 0 triggers.because versions returned 404
time.sleep(0.01)
assert event_counter == 0
event_counter = 0
with requests_mock.Mocker() as m:
m.get("http://169.254.169.254/metadata/versions", text=AzureApiResponses.make_versions_response())
m.get(
"http://169.254.169.254/metadata/instance?api-version=2017-08-01",
text=AzureApiResponses.make_instance_response([("192.168.1.0", "24")]),
)
f.execute()
# Expect 1 trigger
time.sleep(0.01)
assert event_counter == 1
event_counter = 0
# Test subnet extraction:
versions_info = {"2017-08-01": AzureApiResponses.make_instance_response([("192.168.0.0", "24")], raw=False)}
asd = AzureSubnetsDiscovery(AzureMetadataApiExposed(versions_info))
assert asd.extract_azure_subnet() == "192.168.0.0/24"
class AzureApiResponses:
@staticmethod
def make_instance_response(subnets, raw=True):
response = {
"network": {
"interface": [
{"ipv4": {"subnet": [{"address": address, "prefix": prefix} for address, prefix in subnets]}}
]
}
}
if raw:
response = json.dumps(response)
return response
@staticmethod
def make_versions_response():
return json.dumps(
{
"apiVersions": [
"2017-08-01",
]
}
)
@handler.subscribe(AzureMetadataApiExposed)
class TestAzureMetadataApiExposed:
def __init__(self, event):
global event_counter
event_counter += 1

View File

@@ -19,156 +19,6 @@ set_config(Config())
class TestFromPodHostDiscovery: class TestFromPodHostDiscovery:
@staticmethod
def _make_azure_response(*subnets: List[tuple]) -> str:
return json.dumps(
{
"network": {
"interface": [
{"ipv4": {"subnet": [{"address": address, "prefix": prefix} for address, prefix in subnets]}}
]
}
}
)
@staticmethod
def _make_aws_response(*data: List[str]) -> str:
return "\n".join(data)
def test_is_azure_pod_request_fail(self):
f = FromPodHostDiscovery(RunningAsPodEvent())
with requests_mock.Mocker() as m:
m.get("http://169.254.169.254/metadata/instance?api-version=2017-08-01", status_code=404)
result = f.is_azure_pod()
assert not result
def test_is_azure_pod_success(self):
f = FromPodHostDiscovery(RunningAsPodEvent())
with requests_mock.Mocker() as m:
m.get(
"http://169.254.169.254/metadata/instance?api-version=2017-08-01",
text=TestFromPodHostDiscovery._make_azure_response(("3.4.5.6", "255.255.255.252")),
)
result = f.is_azure_pod()
assert result
def test_is_aws_pod_v1_request_fail(self):
f = FromPodHostDiscovery(RunningAsPodEvent())
with requests_mock.Mocker() as m:
m.get("http://169.254.169.254/latest/meta-data/", status_code=404)
result = f.is_aws_pod_v1()
assert not result
def test_is_aws_pod_v1_success(self):
f = FromPodHostDiscovery(RunningAsPodEvent())
with requests_mock.Mocker() as m:
m.get(
"http://169.254.169.254/latest/meta-data/",
text=TestFromPodHostDiscovery._make_aws_response(
"\n".join(
(
"ami-id",
"ami-launch-index",
"ami-manifest-path",
"block-device-mapping/",
"events/",
"hostname",
"iam/",
"instance-action",
"instance-id",
"instance-type",
"local-hostname",
"local-ipv4",
"mac",
"metrics/",
"network/",
"placement/",
"profile",
"public-hostname",
"public-ipv4",
"public-keys/",
"reservation-id",
"security-groups",
"services/",
)
),
),
)
result = f.is_aws_pod_v1()
assert result
def test_is_aws_pod_v2_request_fail(self):
f = FromPodHostDiscovery(RunningAsPodEvent())
with requests_mock.Mocker() as m:
m.put(
"http://169.254.169.254/latest/api/token/",
headers={"X-aws-ec2-metatadata-token-ttl-seconds": "21600"},
status_code=404,
)
m.get(
"http://169.254.169.254/latest/meta-data/",
headers={"X-aws-ec2-metatadata-token": "token"},
status_code=404,
)
result = f.is_aws_pod_v2()
assert not result
def test_is_aws_pod_v2_success(self):
f = FromPodHostDiscovery(RunningAsPodEvent())
with requests_mock.Mocker() as m:
m.put(
"http://169.254.169.254/latest/api/token/",
headers={"X-aws-ec2-metatadata-token-ttl-seconds": "21600"},
text=TestFromPodHostDiscovery._make_aws_response("token"),
)
m.get(
"http://169.254.169.254/latest/meta-data/",
headers={"X-aws-ec2-metatadata-token": "token"},
text=TestFromPodHostDiscovery._make_aws_response(
"\n".join(
(
"ami-id",
"ami-launch-index",
"ami-manifest-path",
"block-device-mapping/",
"events/",
"hostname",
"iam/",
"instance-action",
"instance-id",
"instance-type",
"local-hostname",
"local-ipv4",
"mac",
"metrics/",
"network/",
"placement/",
"profile",
"public-hostname",
"public-ipv4",
"public-keys/",
"reservation-id",
"security-groups",
"services/",
)
),
),
)
result = f.is_aws_pod_v2()
assert result
def test_execute_scan_cidr(self): def test_execute_scan_cidr(self):
set_config(Config(cidr="1.2.3.4/30")) set_config(Config(cidr="1.2.3.4/30"))
f = FromPodHostDiscovery(RunningAsPodEvent()) f = FromPodHostDiscovery(RunningAsPodEvent())

View File

@@ -8,42 +8,64 @@ import json
set_config(Config()) set_config(Config())
from kube_hunter.modules.hunting.kubelet import ExposedPodsHandler from kube_hunter.modules.hunting.kubelet import ExposedPodsHandler
from kube_hunter.modules.discovery.cloud.azure import AzureMetadataApiExposed
from kube_hunter.modules.hunting.aks import AzureSpnHunter from kube_hunter.modules.hunting.aks import AzureSpnHunter
from kube_hunter.core.events.types import MultipleEventsContainer
def test_AzureSpnHunter(): def test_AzureSpnHunter():
e = ExposedPodsHandler(pods=[]) exposed_pods = ExposedPodsHandler(pods=[])
azure_metadata = AzureMetadataApiExposed(
versions_info={
"2017-08-01": {
"network": {
"interface": [
{
"ipv4": {
"subnet": [
{"address": address, "prefix": prefix}
for address, prefix in [("192.168.1.0", "24")]
]
}
}
]
}
}
}
)
pod_template = '{{"items":[ {{"apiVersion":"v1","kind":"Pod","metadata":{{"name":"etc","namespace":"default"}},"spec":{{"containers":[{{"command":["sleep","99999"],"image":"ubuntu","name":"test","volumeMounts":[{{"mountPath":"/mp","name":"v"}}]}}],"volumes":[{{"hostPath":{{"path":"{}"}},"name":"v"}}]}}}} ]}}' pod_template = '{{"items":[ {{"apiVersion":"v1","kind":"Pod","metadata":{{"name":"etc","namespace":"default"}},"spec":{{"containers":[{{"command":["sleep","99999"],"image":"ubuntu","name":"test","volumeMounts":[{{"mountPath":"/mp","name":"v"}}]}}],"volumes":[{{"hostPath":{{"path":"{}"}},"name":"v"}}]}}}} ]}}'
bad_paths = ["/", "/etc", "/etc/", "/etc/kubernetes", "/etc/kubernetes/azure.json"] bad_paths = ["/", "/etc", "/etc/", "/etc/kubernetes", "/etc/kubernetes/azure.json"]
good_paths = ["/yo", "/etc/yo", "/etc/kubernetes/yo.json"] good_paths = ["/yo", "/etc/yo", "/etc/kubernetes/yo.json"]
for p in bad_paths: for p in bad_paths:
e.pods = json.loads(pod_template.format(p))["items"] exposed_pods.pods = json.loads(pod_template.format(p))["items"]
h = AzureSpnHunter(e) h = AzureSpnHunter(MultipleEventsContainer([azure_metadata, exposed_pods]))
c = h.get_key_container() c = h.get_key_container()
assert c assert c
for p in good_paths: for p in good_paths:
e.pods = json.loads(pod_template.format(p))["items"] exposed_pods.pods = json.loads(pod_template.format(p))["items"]
h = AzureSpnHunter(e) h = AzureSpnHunter(MultipleEventsContainer([azure_metadata, exposed_pods]))
c = h.get_key_container() c = h.get_key_container()
assert c == None assert c == None
pod_no_volume_mounts = '{"items":[ {"apiVersion":"v1","kind":"Pod","metadata":{"name":"etc","namespace":"default"},"spec":{"containers":[{"command":["sleep","99999"],"image":"ubuntu","name":"test"}],"volumes":[{"hostPath":{"path":"/whatever"},"name":"v"}]}} ]}' pod_no_volume_mounts = '{"items":[ {"apiVersion":"v1","kind":"Pod","metadata":{"name":"etc","namespace":"default"},"spec":{"containers":[{"command":["sleep","99999"],"image":"ubuntu","name":"test"}],"volumes":[{"hostPath":{"path":"/whatever"},"name":"v"}]}} ]}'
e.pods = json.loads(pod_no_volume_mounts)["items"] exposed_pods.pods = json.loads(pod_no_volume_mounts)["items"]
h = AzureSpnHunter(e) h = AzureSpnHunter(MultipleEventsContainer([azure_metadata, exposed_pods]))
c = h.get_key_container() c = h.get_key_container()
assert c == None assert c == None
pod_no_volumes = '{"items":[ {"apiVersion":"v1","kind":"Pod","metadata":{"name":"etc","namespace":"default"},"spec":{"containers":[{"command":["sleep","99999"],"image":"ubuntu","name":"test"}]}} ]}' pod_no_volumes = '{"items":[ {"apiVersion":"v1","kind":"Pod","metadata":{"name":"etc","namespace":"default"},"spec":{"containers":[{"command":["sleep","99999"],"image":"ubuntu","name":"test"}]}} ]}'
e.pods = json.loads(pod_no_volumes)["items"] exposed_pods.pods = json.loads(pod_no_volumes)["items"]
h = AzureSpnHunter(e) h = AzureSpnHunter(MultipleEventsContainer([azure_metadata, exposed_pods]))
c = h.get_key_container() c = h.get_key_container()
assert c == None assert c == None
pod_other_volume = '{"items":[ {"apiVersion":"v1","kind":"Pod","metadata":{"name":"etc","namespace":"default"},"spec":{"containers":[{"command":["sleep","99999"],"image":"ubuntu","name":"test","volumeMounts":[{"mountPath":"/mp","name":"v"}]}],"volumes":[{"emptyDir":{},"name":"v"}]}} ]}' pod_other_volume = '{"items":[ {"apiVersion":"v1","kind":"Pod","metadata":{"name":"etc","namespace":"default"},"spec":{"containers":[{"command":["sleep","99999"],"image":"ubuntu","name":"test","volumeMounts":[{"mountPath":"/mp","name":"v"}]}],"volumes":[{"emptyDir":{},"name":"v"}]}} ]}'
e.pods = json.loads(pod_other_volume)["items"] exposed_pods.pods = json.loads(pod_other_volume)["items"]
h = AzureSpnHunter(e) h = AzureSpnHunter(MultipleEventsContainer([azure_metadata, exposed_pods]))
c = h.get_key_container() c = h.get_key_container()
assert c == None assert c == None

View File

@@ -125,7 +125,6 @@ def test_AccessApiServer():
@handler.subscribe(ListNamespaces) @handler.subscribe(ListNamespaces)
class test_ListNamespaces: class test_ListNamespaces:
def __init__(self, event): def __init__(self, event):
print("ListNamespaces")
assert event.evidence == ["hello"] assert event.evidence == ["hello"]
if event.host == "mocktoken": if event.host == "mocktoken":
assert event.auth_token == "so-secret" assert event.auth_token == "so-secret"
@@ -138,7 +137,6 @@ class test_ListNamespaces:
@handler.subscribe(ListPodsAndNamespaces) @handler.subscribe(ListPodsAndNamespaces)
class test_ListPodsAndNamespaces: class test_ListPodsAndNamespaces:
def __init__(self, event): def __init__(self, event):
print("ListPodsAndNamespaces")
assert len(event.evidence) == 2 assert len(event.evidence) == 2
for pod in event.evidence: for pod in event.evidence:
if pod["name"] == "podA": if pod["name"] == "podA":
@@ -161,7 +159,6 @@ class test_ListPodsAndNamespaces:
@handler.subscribe(ListRoles) @handler.subscribe(ListRoles)
class test_ListRoles: class test_ListRoles:
def __init__(self, event): def __init__(self, event):
print("ListRoles")
assert 0 assert 0
global counter global counter
counter += 1 counter += 1
@@ -172,7 +169,6 @@ class test_ListRoles:
@handler.subscribe(ListClusterRoles) @handler.subscribe(ListClusterRoles)
class test_ListClusterRoles: class test_ListClusterRoles:
def __init__(self, event): def __init__(self, event):
print("ListClusterRoles")
assert event.auth_token == "so-secret" assert event.auth_token == "so-secret"
global counter global counter
counter += 1 counter += 1
@@ -181,7 +177,6 @@ class test_ListClusterRoles:
@handler.subscribe(ServerApiAccess) @handler.subscribe(ServerApiAccess)
class test_ServerApiAccess: class test_ServerApiAccess:
def __init__(self, event): def __init__(self, event):
print("ServerApiAccess")
if event.category == ExposedSensitiveInterfacesTechnique: if event.category == ExposedSensitiveInterfacesTechnique:
assert event.auth_token is None assert event.auth_token is None
else: else:
@@ -194,7 +189,6 @@ class test_ServerApiAccess:
@handler.subscribe(ApiServerPassiveHunterFinished) @handler.subscribe(ApiServerPassiveHunterFinished)
class test_PassiveHunterFinished: class test_PassiveHunterFinished:
def __init__(self, event): def __init__(self, event):
print("PassiveHunterFinished")
assert event.namespaces == ["hello"] assert event.namespaces == ["hello"]
global counter global counter
counter += 1 counter += 1