From dd1ed76dc103defdcd9fd177745d0e2da747cea6 Mon Sep 17 00:00:00 2001 From: Liz Rice Date: Mon, 4 Mar 2019 11:30:41 +0000 Subject: [PATCH] Better names, descriptions and tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When you query for resources, you get the ones you’re entitled to see - it’s misleading to suggest you’re getting all of them --- src/modules/hunting/apiserver.py | 38 ++++++++--------- tests/hunting/test_apiserver_hunter.py | 56 +++++++++++++++++++++----- 2 files changed, 66 insertions(+), 28 deletions(-) diff --git a/src/modules/hunting/apiserver.py b/src/modules/hunting/apiserver.py index b8f667b..be96d37 100644 --- a/src/modules/hunting/apiserver.py +++ b/src/modules/hunting/apiserver.py @@ -23,7 +23,7 @@ class ServerApiAccess(Vulnerability, Event): class ListPodsAndNamespaces(Vulnerability, Event): - """ Accessing the pods list under ALL of the namespaces might give an attacker valuable information""" + """ Accessing pods might give an attacker valuable information""" def __init__(self, evidence): Vulnerability.__init__(self, KubernetesCluster, name="Listing pods", @@ -31,29 +31,29 @@ class ListPodsAndNamespaces(Vulnerability, Event): self.evidence = evidence -class ListAllNamespaces(Vulnerability, Event): - """ Accessing all of the namespaces might give an attacker valuable information """ +class ListNamespaces(Vulnerability, Event): + """ Accessing namespaces might give an attacker valuable information """ def __init__(self, evidence): - Vulnerability.__init__(self, KubernetesCluster, name="Listing all namespaces", + Vulnerability.__init__(self, KubernetesCluster, name="Listing namespaces", category=InformationDisclosure) self.evidence = evidence -class ListAllRoles(Vulnerability, Event): - """ Accessing all of the roles might give an attacker valuable information """ +class ListRoles(Vulnerability, Event): + """ Accessing roles might give an attacker valuable information """ def __init__(self, evidence): - Vulnerability.__init__(self, KubernetesCluster, name="Listing all roles", + Vulnerability.__init__(self, KubernetesCluster, name="Listing roles", category=InformationDisclosure) self.evidence = evidence -class ListAllClusterRoles(Vulnerability, Event): - """ Accessing all of the cluster roles might give an attacker valuable information """ +class ListClusterRoles(Vulnerability, Event): + """ Accessing cluster roles might give an attacker valuable information """ def __init__(self, evidence): - Vulnerability.__init__(self, KubernetesCluster, name="Listing all cluster roles", + Vulnerability.__init__(self, KubernetesCluster, name="Listing cluster roles", category=InformationDisclosure) self.evidence = evidence @@ -238,13 +238,13 @@ class AccessApiServer(Hunter): pass return None - def get_all_namespaces(self): + def get_namespaces(self): return self.get_items("{path}/api/v1/namespaces".format(path=self.path)) - def get_all_cluster_roles(self): + def get_cluster_roles(self): return self.get_items("{path}/apis/rbac.authorization.k8s.io/v1/clusterroles".format(path=self.path)) - def get_all_roles(self): + def get_roles(self): return self.get_items("{path}/apis/rbac.authorization.k8s.io/v1/roles".format(path=self.path)) def execute(self): @@ -252,21 +252,21 @@ class AccessApiServer(Hunter): if api: self.publish_event(ServerApiAccess(api, self.category)) - namespaces = self.get_all_namespaces() + namespaces = self.get_namespaces() if namespaces: - self.publish_event(ListAllNamespaces(namespaces)) + self.publish_event(ListNamespaces(namespaces)) pods = self.get_pods() if pods: self.publish_event(ListPodsAndNamespaces(pods)) - roles = self.get_all_roles() + roles = self.get_roles() if roles: - self.publish_event(ListAllRoles(roles)) + self.publish_event(ListRoles(roles)) - cluster_roles = self.get_all_cluster_roles() + cluster_roles = self.get_cluster_roles() if cluster_roles: - self.publish_event(ListAllClusterRoles(cluster_roles)) + self.publish_event(ListClusterRoles(cluster_roles)) # If we have a service account token, this event should get triggered twice - once with and once without # the token diff --git a/tests/hunting/test_apiserver_hunter.py b/tests/hunting/test_apiserver_hunter.py index 965d939..210b6fc 100644 --- a/tests/hunting/test_apiserver_hunter.py +++ b/tests/hunting/test_apiserver_hunter.py @@ -1,7 +1,8 @@ import requests_mock +import time from src.modules.hunting.apiserver import AccessApiServer, AccessApiServerWithToken, ServerApiAccess, AccessApiServerActive -from src.modules.hunting.apiserver import ListAllNamespaces, ListPodsAndNamespaces, ListAllRoles, ListAllClusterRoles +from src.modules.hunting.apiserver import ListNamespaces, ListPodsAndNamespaces, ListRoles, ListClusterRoles from src.modules.hunting.apiserver import ApiServerPassiveHunterFinished from src.modules.hunting.apiserver import CreateANamespace, DeleteANamespace from src.modules.discovery.apiserver import ApiServer @@ -9,7 +10,11 @@ from src.core.events.types import Event from src.core.types import UnauthenticatedAccess, InformationDisclosure from src.core.events import handler +counter = 0 + def test_ApiServerToken(): + global counter + counter = 0 e = ApiServer() e.host = "1.2.3.4" @@ -19,13 +24,19 @@ def test_ApiServerToken(): h = AccessApiServerWithToken(e) assert h.event.auth_token == "my-secret-token" + # This test doesn't generate any events + time.sleep(0.01) + assert counter == 0 + def test_AccessApiServer(): + global counter + counter = 0 + e = ApiServer() e.host = "mockKubernetes" e.port = 443 with requests_mock.Mocker() as m: - # TODO check that these responses reflect what Kubernetes does m.get('https://mockKubernetes:443/api', text='{}') m.get('https://mockKubernetes:443/api/v1/namespaces', text='{"items":[{"metadata":{"name":"hello"}}]}') m.get('https://mockKubernetes:443/api/v1/pods', @@ -37,6 +48,12 @@ def test_AccessApiServer(): h = AccessApiServer(e) h.execute() + # We should see events for Server API Access, Namespaces, Pods, and the passive hunter finished + time.sleep(0.01) + assert counter == 4 + + # Try with an auth token + counter = 0 with requests_mock.Mocker() as m: # TODO check that these responses reflect what Kubernetes does m.get('https://mockKubernetesToken:443/api', text='{}') @@ -53,20 +70,27 @@ def test_AccessApiServer(): h = AccessApiServerWithToken(e) h.execute() + # We should see the same set of events but with the addition of Cluster Roles + time.sleep(0.01) + assert counter == 5 -@handler.subscribe(ListAllNamespaces) -class test_ListAllNamespaces(object): +@handler.subscribe(ListNamespaces) +class test_ListNamespaces(object): def __init__(self, event): + print("ListNamespaces") assert event.evidence == ['hello'] if event.host == "mockKubernetesToken": assert event.auth_token == "so-secret" else: assert event.auth_token is None + global counter + counter += 1 @handler.subscribe(ListPodsAndNamespaces) class test_ListPodsAndNamespaces(object): def __init__(self, event): + print("ListPodsAndNamespaces") assert len(event.evidence) == 2 for pod in event.evidence: if pod["name"] == "podA": @@ -77,33 +101,47 @@ class test_ListPodsAndNamespaces(object): assert event.auth_token == "so-secret" else: assert event.auth_token is None + global counter + counter += 1 # Should never see this because the API call in the test returns 403 status code -@handler.subscribe(ListAllRoles) -class test_ListAllRoles(object): +@handler.subscribe(ListRoles) +class test_ListRoles(object): def __init__(self, event): + print("ListRoles") assert 0 + global counter + counter += 1 # Should only see this when we have a token because the API call returns an empty list of items # in the test where we have no token -@handler.subscribe(ListAllClusterRoles) -class test_ListAllClusterRoles(object): +@handler.subscribe(ListClusterRoles) +class test_ListClusterRoles(object): def __init__(self, event): + print("ListClusterRoles") assert event.auth_token == "so-secret" + global counter + counter += 1 @handler.subscribe(ServerApiAccess) class test_ServerApiAccess(object): def __init__(self, event): + print("ServerApiAccess") if event.category == UnauthenticatedAccess: assert event.auth_token is None else: assert event.category == InformationDisclosure assert event.auth_token is not None + global counter + counter += 1 @handler.subscribe(ApiServerPassiveHunterFinished) class test_PassiveHunterFinished(object): def __init__(self, event): + print("PassiveHunterFinished") assert event.namespaces == ["hello"] + global counter + counter += 1 def test_AccessApiServerActive(): e = ApiServerPassiveHunterFinished(namespaces=["hello-namespace"]) @@ -111,7 +149,7 @@ def test_AccessApiServerActive(): e.port = 443 with requests_mock.Mocker() as m: - # TODO check that these responses reflect what Kubernetes does + # TODO more tests here with real responses m.post('https://mockKubernetes:443/api/v1/namespaces', text=""" { "kind": "Namespace",