Show in the report whether access was via service token or not

This commit is contained in:
Liz Rice
2019-03-04 12:35:57 +00:00
parent dd1ed76dc1
commit e77f5fdcc8
2 changed files with 40 additions and 27 deletions

View File

@@ -17,45 +17,51 @@ from ...core.types import RemoteCodeExec, AccessRisk, InformationDisclosure, Una
class ServerApiAccess(Vulnerability, Event):
""" The API Server port is accessible. Depending on your RBAC settings this could expose access to or control of your cluster. """
def __init__(self, evidence, category):
Vulnerability.__init__(self, KubernetesCluster, name="Access to server API", category=category)
def __init__(self, evidence, using_token):
if using_token:
name = "Access to API using service account token"
category = InformationDisclosure
else:
name = "Unauthenticated access to API"
category = UnauthenticatedAccess
Vulnerability.__init__(self, KubernetesCluster, name=name, category=category)
self.evidence = evidence
class ApiInfoDisclosure(Vulnerability, Event):
def __init__(self, evidence, using_token, name):
if using_token:
name +=" using service account token"
else:
name +=" as anonymous user"
Vulnerability.__init__(self, KubernetesCluster, name=name, category=InformationDisclosure)
self.evidence = evidence
class ListPodsAndNamespaces(Vulnerability, Event):
class ListPodsAndNamespaces(ApiInfoDisclosure):
""" Accessing pods might give an attacker valuable information"""
def __init__(self, evidence):
Vulnerability.__init__(self, KubernetesCluster, name="Listing pods",
category=InformationDisclosure)
self.evidence = evidence
def __init__(self, evidence, using_token):
ApiInfoDisclosure.__init__(self, evidence, using_token, "Listing pods")
class ListNamespaces(Vulnerability, Event):
class ListNamespaces(ApiInfoDisclosure):
""" Accessing namespaces might give an attacker valuable information """
def __init__(self, evidence):
Vulnerability.__init__(self, KubernetesCluster, name="Listing namespaces",
category=InformationDisclosure)
self.evidence = evidence
def __init__(self, evidence, using_token):
ApiInfoDisclosure.__init__(self, evidence, using_token, "Listing namespaces")
class ListRoles(Vulnerability, Event):
class ListRoles(ApiInfoDisclosure):
""" Accessing roles might give an attacker valuable information """
def __init__(self, evidence):
Vulnerability.__init__(self, KubernetesCluster, name="Listing roles",
category=InformationDisclosure)
self.evidence = evidence
def __init__(self, evidence, using_token):
ApiInfoDisclosure.__init__(self, evidence, using_token, "Listing roles")
class ListClusterRoles(Vulnerability, Event):
class ListClusterRoles(ApiInfoDisclosure):
""" Accessing cluster roles might give an attacker valuable information """
def __init__(self, evidence):
Vulnerability.__init__(self, KubernetesCluster, name="Listing cluster roles",
category=InformationDisclosure)
self.evidence = evidence
def __init__(self, evidence, using_token):
ApiInfoDisclosure.__init__(self, evidence, using_token, "Listing cluster roles")
class CreateANamespace(Vulnerability, Event):
@@ -191,6 +197,7 @@ class AccessApiServer(Hunter):
self.path = "https://{}:{}".format(self.event.host, self.event.port)
self.headers = {}
self.category = UnauthenticatedAccess
self.with_token = False
def access_api_server(self):
logging.debug('Passive Hunter is attempting to access the API at {host}:{port}'.format(host=self.event.host,
@@ -250,23 +257,24 @@ class AccessApiServer(Hunter):
def execute(self):
api = self.access_api_server()
if api:
self.publish_event(ServerApiAccess(api, self.category))
self.publish_event(ServerApiAccess(api, self.with_token))
namespaces = self.get_namespaces()
if namespaces:
self.publish_event(ListNamespaces(namespaces))
self.publish_event(ListNamespaces(namespaces, self.with_token))
pods = self.get_pods()
if pods:
self.publish_event(ListPodsAndNamespaces(pods))
print pods
self.publish_event(ListPodsAndNamespaces(pods, self.with_token))
roles = self.get_roles()
if roles:
self.publish_event(ListRoles(roles))
self.publish_event(ListRoles(roles, self.with_token))
cluster_roles = self.get_cluster_roles()
if cluster_roles:
self.publish_event(ListClusterRoles(cluster_roles))
self.publish_event(ListClusterRoles(cluster_roles, self.with_token))
# If we have a service account token, this event should get triggered twice - once with and once without
# the token
@@ -283,6 +291,7 @@ class AccessApiServerWithToken(AccessApiServer):
assert self.event.auth_token != ''
self.headers = {'Authorization': 'Bearer ' + self.event.auth_token}
self.category = InformationDisclosure
self.with_token = True
# Active Hunter

View File

@@ -99,8 +99,12 @@ class test_ListPodsAndNamespaces(object):
assert pod["namespace"] == "namespaceB"
if event.host == "mockKubernetesToken":
assert event.auth_token == "so-secret"
assert "token" in event.name
assert "anon" not in event.name
else:
assert event.auth_token is None
assert "token" not in event.name
assert "anon" in event.name
global counter
counter += 1