Better names, descriptions and tests

When you query for resources, you get the ones you’re entitled to see - it’s misleading to suggest you’re getting all of them
This commit is contained in:
Liz Rice
2019-03-04 11:30:41 +00:00
parent 5c22ecdf3c
commit dd1ed76dc1
2 changed files with 66 additions and 28 deletions

View File

@@ -23,7 +23,7 @@ class ServerApiAccess(Vulnerability, Event):
class ListPodsAndNamespaces(Vulnerability, Event):
""" Accessing the pods list under ALL of the namespaces might give an attacker valuable information"""
""" Accessing pods might give an attacker valuable information"""
def __init__(self, evidence):
Vulnerability.__init__(self, KubernetesCluster, name="Listing pods",
@@ -31,29 +31,29 @@ class ListPodsAndNamespaces(Vulnerability, Event):
self.evidence = evidence
class ListAllNamespaces(Vulnerability, Event):
""" Accessing all of the namespaces might give an attacker valuable information """
class ListNamespaces(Vulnerability, Event):
""" Accessing namespaces might give an attacker valuable information """
def __init__(self, evidence):
Vulnerability.__init__(self, KubernetesCluster, name="Listing all namespaces",
Vulnerability.__init__(self, KubernetesCluster, name="Listing namespaces",
category=InformationDisclosure)
self.evidence = evidence
class ListAllRoles(Vulnerability, Event):
""" Accessing all of the roles might give an attacker valuable information """
class ListRoles(Vulnerability, Event):
""" Accessing roles might give an attacker valuable information """
def __init__(self, evidence):
Vulnerability.__init__(self, KubernetesCluster, name="Listing all roles",
Vulnerability.__init__(self, KubernetesCluster, name="Listing roles",
category=InformationDisclosure)
self.evidence = evidence
class ListAllClusterRoles(Vulnerability, Event):
""" Accessing all of the cluster roles might give an attacker valuable information """
class ListClusterRoles(Vulnerability, Event):
""" Accessing cluster roles might give an attacker valuable information """
def __init__(self, evidence):
Vulnerability.__init__(self, KubernetesCluster, name="Listing all cluster roles",
Vulnerability.__init__(self, KubernetesCluster, name="Listing cluster roles",
category=InformationDisclosure)
self.evidence = evidence
@@ -238,13 +238,13 @@ class AccessApiServer(Hunter):
pass
return None
def get_all_namespaces(self):
def get_namespaces(self):
return self.get_items("{path}/api/v1/namespaces".format(path=self.path))
def get_all_cluster_roles(self):
def get_cluster_roles(self):
return self.get_items("{path}/apis/rbac.authorization.k8s.io/v1/clusterroles".format(path=self.path))
def get_all_roles(self):
def get_roles(self):
return self.get_items("{path}/apis/rbac.authorization.k8s.io/v1/roles".format(path=self.path))
def execute(self):
@@ -252,21 +252,21 @@ class AccessApiServer(Hunter):
if api:
self.publish_event(ServerApiAccess(api, self.category))
namespaces = self.get_all_namespaces()
namespaces = self.get_namespaces()
if namespaces:
self.publish_event(ListAllNamespaces(namespaces))
self.publish_event(ListNamespaces(namespaces))
pods = self.get_pods()
if pods:
self.publish_event(ListPodsAndNamespaces(pods))
roles = self.get_all_roles()
roles = self.get_roles()
if roles:
self.publish_event(ListAllRoles(roles))
self.publish_event(ListRoles(roles))
cluster_roles = self.get_all_cluster_roles()
cluster_roles = self.get_cluster_roles()
if cluster_roles:
self.publish_event(ListAllClusterRoles(cluster_roles))
self.publish_event(ListClusterRoles(cluster_roles))
# If we have a service account token, this event should get triggered twice - once with and once without
# the token

View File

@@ -1,7 +1,8 @@
import requests_mock
import time
from src.modules.hunting.apiserver import AccessApiServer, AccessApiServerWithToken, ServerApiAccess, AccessApiServerActive
from src.modules.hunting.apiserver import ListAllNamespaces, ListPodsAndNamespaces, ListAllRoles, ListAllClusterRoles
from src.modules.hunting.apiserver import ListNamespaces, ListPodsAndNamespaces, ListRoles, ListClusterRoles
from src.modules.hunting.apiserver import ApiServerPassiveHunterFinished
from src.modules.hunting.apiserver import CreateANamespace, DeleteANamespace
from src.modules.discovery.apiserver import ApiServer
@@ -9,7 +10,11 @@ from src.core.events.types import Event
from src.core.types import UnauthenticatedAccess, InformationDisclosure
from src.core.events import handler
counter = 0
def test_ApiServerToken():
global counter
counter = 0
e = ApiServer()
e.host = "1.2.3.4"
@@ -19,13 +24,19 @@ def test_ApiServerToken():
h = AccessApiServerWithToken(e)
assert h.event.auth_token == "my-secret-token"
# This test doesn't generate any events
time.sleep(0.01)
assert counter == 0
def test_AccessApiServer():
global counter
counter = 0
e = ApiServer()
e.host = "mockKubernetes"
e.port = 443
with requests_mock.Mocker() as m:
# TODO check that these responses reflect what Kubernetes does
m.get('https://mockKubernetes:443/api', text='{}')
m.get('https://mockKubernetes:443/api/v1/namespaces', text='{"items":[{"metadata":{"name":"hello"}}]}')
m.get('https://mockKubernetes:443/api/v1/pods',
@@ -37,6 +48,12 @@ def test_AccessApiServer():
h = AccessApiServer(e)
h.execute()
# We should see events for Server API Access, Namespaces, Pods, and the passive hunter finished
time.sleep(0.01)
assert counter == 4
# Try with an auth token
counter = 0
with requests_mock.Mocker() as m:
# TODO check that these responses reflect what Kubernetes does
m.get('https://mockKubernetesToken:443/api', text='{}')
@@ -53,20 +70,27 @@ def test_AccessApiServer():
h = AccessApiServerWithToken(e)
h.execute()
# We should see the same set of events but with the addition of Cluster Roles
time.sleep(0.01)
assert counter == 5
@handler.subscribe(ListAllNamespaces)
class test_ListAllNamespaces(object):
@handler.subscribe(ListNamespaces)
class test_ListNamespaces(object):
def __init__(self, event):
print("ListNamespaces")
assert event.evidence == ['hello']
if event.host == "mockKubernetesToken":
assert event.auth_token == "so-secret"
else:
assert event.auth_token is None
global counter
counter += 1
@handler.subscribe(ListPodsAndNamespaces)
class test_ListPodsAndNamespaces(object):
def __init__(self, event):
print("ListPodsAndNamespaces")
assert len(event.evidence) == 2
for pod in event.evidence:
if pod["name"] == "podA":
@@ -77,33 +101,47 @@ class test_ListPodsAndNamespaces(object):
assert event.auth_token == "so-secret"
else:
assert event.auth_token is None
global counter
counter += 1
# Should never see this because the API call in the test returns 403 status code
@handler.subscribe(ListAllRoles)
class test_ListAllRoles(object):
@handler.subscribe(ListRoles)
class test_ListRoles(object):
def __init__(self, event):
print("ListRoles")
assert 0
global counter
counter += 1
# Should only see this when we have a token because the API call returns an empty list of items
# in the test where we have no token
@handler.subscribe(ListAllClusterRoles)
class test_ListAllClusterRoles(object):
@handler.subscribe(ListClusterRoles)
class test_ListClusterRoles(object):
def __init__(self, event):
print("ListClusterRoles")
assert event.auth_token == "so-secret"
global counter
counter += 1
@handler.subscribe(ServerApiAccess)
class test_ServerApiAccess(object):
def __init__(self, event):
print("ServerApiAccess")
if event.category == UnauthenticatedAccess:
assert event.auth_token is None
else:
assert event.category == InformationDisclosure
assert event.auth_token is not None
global counter
counter += 1
@handler.subscribe(ApiServerPassiveHunterFinished)
class test_PassiveHunterFinished(object):
def __init__(self, event):
print("PassiveHunterFinished")
assert event.namespaces == ["hello"]
global counter
counter += 1
def test_AccessApiServerActive():
e = ApiServerPassiveHunterFinished(namespaces=["hello-namespace"])
@@ -111,7 +149,7 @@ def test_AccessApiServerActive():
e.port = 443
with requests_mock.Mocker() as m:
# TODO check that these responses reflect what Kubernetes does
# TODO more tests here with real responses
m.post('https://mockKubernetes:443/api/v1/namespaces', text="""
{
"kind": "Namespace",