diff --git a/src/modules/hunting/apiserver.py b/src/modules/hunting/apiserver.py index be96d37..eeed80d 100644 --- a/src/modules/hunting/apiserver.py +++ b/src/modules/hunting/apiserver.py @@ -17,45 +17,51 @@ from ...core.types import RemoteCodeExec, AccessRisk, InformationDisclosure, Una class ServerApiAccess(Vulnerability, Event): """ The API Server port is accessible. Depending on your RBAC settings this could expose access to or control of your cluster. """ - def __init__(self, evidence, category): - Vulnerability.__init__(self, KubernetesCluster, name="Access to server API", category=category) + def __init__(self, evidence, using_token): + if using_token: + name = "Access to API using service account token" + category = InformationDisclosure + else: + name = "Unauthenticated access to API" + category = UnauthenticatedAccess + Vulnerability.__init__(self, KubernetesCluster, name=name, category=category) self.evidence = evidence +class ApiInfoDisclosure(Vulnerability, Event): + def __init__(self, evidence, using_token, name): + if using_token: + name +=" using service account token" + else: + name +=" as anonymous user" + Vulnerability.__init__(self, KubernetesCluster, name=name, category=InformationDisclosure) + self.evidence = evidence -class ListPodsAndNamespaces(Vulnerability, Event): +class ListPodsAndNamespaces(ApiInfoDisclosure): """ Accessing pods might give an attacker valuable information""" - def __init__(self, evidence): - Vulnerability.__init__(self, KubernetesCluster, name="Listing pods", - category=InformationDisclosure) - self.evidence = evidence + def __init__(self, evidence, using_token): + ApiInfoDisclosure.__init__(self, evidence, using_token, "Listing pods") -class ListNamespaces(Vulnerability, Event): +class ListNamespaces(ApiInfoDisclosure): """ Accessing namespaces might give an attacker valuable information """ - def __init__(self, evidence): - Vulnerability.__init__(self, KubernetesCluster, name="Listing namespaces", - category=InformationDisclosure) - self.evidence = evidence + def __init__(self, evidence, using_token): + ApiInfoDisclosure.__init__(self, evidence, using_token, "Listing namespaces") -class ListRoles(Vulnerability, Event): +class ListRoles(ApiInfoDisclosure): """ Accessing roles might give an attacker valuable information """ - def __init__(self, evidence): - Vulnerability.__init__(self, KubernetesCluster, name="Listing roles", - category=InformationDisclosure) - self.evidence = evidence + def __init__(self, evidence, using_token): + ApiInfoDisclosure.__init__(self, evidence, using_token, "Listing roles") -class ListClusterRoles(Vulnerability, Event): +class ListClusterRoles(ApiInfoDisclosure): """ Accessing cluster roles might give an attacker valuable information """ - def __init__(self, evidence): - Vulnerability.__init__(self, KubernetesCluster, name="Listing cluster roles", - category=InformationDisclosure) - self.evidence = evidence + def __init__(self, evidence, using_token): + ApiInfoDisclosure.__init__(self, evidence, using_token, "Listing cluster roles") class CreateANamespace(Vulnerability, Event): @@ -191,6 +197,7 @@ class AccessApiServer(Hunter): self.path = "https://{}:{}".format(self.event.host, self.event.port) self.headers = {} self.category = UnauthenticatedAccess + self.with_token = False def access_api_server(self): logging.debug('Passive Hunter is attempting to access the API at {host}:{port}'.format(host=self.event.host, @@ -250,23 +257,24 @@ class AccessApiServer(Hunter): def execute(self): api = self.access_api_server() if api: - self.publish_event(ServerApiAccess(api, self.category)) + self.publish_event(ServerApiAccess(api, self.with_token)) namespaces = self.get_namespaces() if namespaces: - self.publish_event(ListNamespaces(namespaces)) + self.publish_event(ListNamespaces(namespaces, self.with_token)) pods = self.get_pods() if pods: - self.publish_event(ListPodsAndNamespaces(pods)) + print pods + self.publish_event(ListPodsAndNamespaces(pods, self.with_token)) roles = self.get_roles() if roles: - self.publish_event(ListRoles(roles)) + self.publish_event(ListRoles(roles, self.with_token)) cluster_roles = self.get_cluster_roles() if cluster_roles: - self.publish_event(ListClusterRoles(cluster_roles)) + self.publish_event(ListClusterRoles(cluster_roles, self.with_token)) # If we have a service account token, this event should get triggered twice - once with and once without # the token @@ -283,6 +291,7 @@ class AccessApiServerWithToken(AccessApiServer): assert self.event.auth_token != '' self.headers = {'Authorization': 'Bearer ' + self.event.auth_token} self.category = InformationDisclosure + self.with_token = True # Active Hunter diff --git a/tests/hunting/test_apiserver_hunter.py b/tests/hunting/test_apiserver_hunter.py index 210b6fc..a7d88f4 100644 --- a/tests/hunting/test_apiserver_hunter.py +++ b/tests/hunting/test_apiserver_hunter.py @@ -99,8 +99,12 @@ class test_ListPodsAndNamespaces(object): assert pod["namespace"] == "namespaceB" if event.host == "mockKubernetesToken": assert event.auth_token == "so-secret" + assert "token" in event.name + assert "anon" not in event.name else: assert event.auth_token is None + assert "token" not in event.name + assert "anon" in event.name global counter counter += 1