mirror of
https://github.com/aquasecurity/kube-hunter.git
synced 2026-05-06 17:28:30 +00:00
Use a predicate and avoid a whole extra event
This commit is contained in:
@@ -29,13 +29,8 @@ class ApiServerDiscovery(Hunter):
|
||||
|
||||
def execute(self):
|
||||
logging.debug("Attempting to discover an API server")
|
||||
|
||||
# We can discover the API Server with or without the use of a service account token
|
||||
main_request = requests.get("https://{}:{}".format(self.event.host, self.event.port), verify=False).text
|
||||
if '"code"' in main_request:
|
||||
self.event.role = "Master"
|
||||
self.publish_event(ApiServer())
|
||||
|
||||
# But if we have a service account token we will try additional checks
|
||||
if self.event.auth_token:
|
||||
self.publish_event(ApiServerWithServiceAccountToken())
|
||||
|
||||
@@ -186,7 +186,8 @@ class ApiServerPassiveHunterFinished(Event):
|
||||
self.namespaces = namespaces
|
||||
|
||||
|
||||
# Passive Hunter
|
||||
# This Hunter checks what happens if we try to access the API Server without a service account token
|
||||
# If we have a service account token we'll also trigger AccessApiServerWithToken below
|
||||
@handler.subscribe(ApiServer)
|
||||
class AccessApiServer(Hunter):
|
||||
""" API Server Hunter
|
||||
@@ -269,7 +270,7 @@ class AccessApiServer(Hunter):
|
||||
# the token
|
||||
self.publish_event(ApiServerPassiveHunterFinished(namespaces))
|
||||
|
||||
@handler.subscribe(ApiServerWithServiceAccountToken)
|
||||
@handler.subscribe(ApiServer, predicate=lambda x: x.auth_token)
|
||||
class AccessApiServerWithToken(AccessApiServer):
|
||||
""" API Server Hunter
|
||||
Accessing the API server using the service account token obtained from a compromised pod
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import requests_mock
|
||||
import time
|
||||
|
||||
from src.modules.discovery.apiserver import ApiServer, ApiServerDiscovery, ApiServerWithServiceAccountToken
|
||||
from src.modules.discovery.apiserver import ApiServer, ApiServerDiscovery
|
||||
from src.core.events.types import Event
|
||||
from src.core.events import handler
|
||||
|
||||
@@ -46,19 +46,18 @@ def test_ApiServerWithServiceAccountToken():
|
||||
time.sleep(0.1)
|
||||
assert counter == 1
|
||||
|
||||
# If we have a token this should create two events
|
||||
e.auth_token = "very_secret"
|
||||
a = ApiServerDiscovery(e)
|
||||
a.execute()
|
||||
time.sleep(0.1)
|
||||
assert counter == 3
|
||||
assert counter == 2
|
||||
|
||||
# But we shouldn't generate an event if we don't see an error code
|
||||
e.host = 'mockOther'
|
||||
a = ApiServerDiscovery(e)
|
||||
a.execute()
|
||||
time.sleep(0.1)
|
||||
assert counter == 3
|
||||
assert counter == 2
|
||||
|
||||
|
||||
# We should only generate an ApiServer event for a response that looks like it came from a Kubernetes node
|
||||
@@ -67,12 +66,4 @@ class testApiServer(object):
|
||||
def __init__(self, event):
|
||||
assert event.host == 'mockKubernetes'
|
||||
global counter
|
||||
counter += 1
|
||||
|
||||
@handler.subscribe(ApiServerWithServiceAccountToken)
|
||||
class testApiServerWithServiceAccountToken(object):
|
||||
def __init__(self, event):
|
||||
assert event.host == 'mockKubernetes'
|
||||
assert event.auth_token == "very_secret"
|
||||
global counter
|
||||
counter += 1
|
||||
counter += 1
|
||||
@@ -135,7 +135,7 @@ class test_ServerApiAccess(object):
|
||||
assert event.auth_token is None
|
||||
else:
|
||||
assert event.category == InformationDisclosure
|
||||
assert event.auth_token is not None
|
||||
assert event.auth_token == "so-secret"
|
||||
global counter
|
||||
counter += 1
|
||||
|
||||
|
||||
Reference in New Issue
Block a user