mirror of
https://github.com/aquasecurity/kube-hunter.git
synced 2026-02-15 02:20:10 +00:00
Compare commits
30 Commits
refresh_wo
...
refactor_h
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
17513d2c6f | ||
|
|
7c2ec7c03c | ||
|
|
ea5e17116d | ||
|
|
79a7882cba | ||
|
|
2672124169 | ||
|
|
023a5d6640 | ||
|
|
f5e76f7c1e | ||
|
|
710aa63dc2 | ||
|
|
cc7f708c7e | ||
|
|
0da9c97031 | ||
|
|
06f73244a5 | ||
|
|
cb3c1dd3b7 | ||
|
|
c40547e387 | ||
|
|
676b23e68a | ||
|
|
6a7ba489f5 | ||
|
|
014b472c0c | ||
|
|
4f01667a6b | ||
|
|
82982183fd | ||
|
|
127029f46c | ||
|
|
9dae39eb54 | ||
|
|
ceeb70c5f4 | ||
|
|
903c0689f0 | ||
|
|
5f39ee0963 | ||
|
|
2c8e8467d2 | ||
|
|
34be06eaa4 | ||
|
|
f6782b9ffc | ||
|
|
6b2a382ada | ||
|
|
391faffe49 | ||
|
|
b3155fcdb0 | ||
|
|
5db3f057a8 |
@@ -1,3 +1,5 @@
|
||||
from enum import Enum
|
||||
|
||||
class HunterBase(object):
|
||||
publishedVulnerabilities = 0
|
||||
|
||||
@@ -32,6 +34,19 @@ class Discovery(HunterBase):
|
||||
pass
|
||||
|
||||
|
||||
""" Clouds Enum """
|
||||
class CloudTypes(Enum):
|
||||
"""Values are as defined in azurespeed"""
|
||||
AKS = "Azure"
|
||||
EKS = "AWS"
|
||||
ACK = "AliCloud"
|
||||
NO_CLOUD = "No Cloud"
|
||||
|
||||
@classmethod
|
||||
def get_enum(cls, value):
|
||||
return {item.value: item for item in cls}.get(value, cls.NO_CLOUD)
|
||||
|
||||
|
||||
"""Kubernetes Components"""
|
||||
class KubernetesCluster():
|
||||
"""Kubernetes Cluster"""
|
||||
@@ -78,4 +93,5 @@ class PrivilegeEscalation(KubernetesCluster):
|
||||
class DenialOfService(object):
|
||||
name = "Denial of Service"
|
||||
|
||||
|
||||
from .events import handler # import is in the bottom to break import loops
|
||||
|
||||
59
src/modules/discovery/aks.py
Normal file
59
src/modules/discovery/aks.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
|
||||
import requests
|
||||
from netaddr import IPNetwork
|
||||
|
||||
from __main__ import config
|
||||
|
||||
from ...core.events import handler
|
||||
from ...core.events.types import Event, NewHostEvent, Vulnerability
|
||||
from ...core.types import Discovery, InformationDisclosure, Azure, CloudTypes
|
||||
|
||||
from .hosts import RunningPodOnCloud, HostDiscoveryUtils
|
||||
|
||||
class AzureMetadataApi(Vulnerability, Event):
|
||||
"""Access to the Azure Metadata API exposes information about the machines associated with the cluster"""
|
||||
def __init__(self, cidr):
|
||||
Vulnerability.__init__(self, Azure, "Azure Metadata Exposure", category=InformationDisclosure)
|
||||
self.cidr = cidr
|
||||
self.evidence = "cidr: {}".format(cidr)
|
||||
|
||||
@handler.subscribe(RunningPodOnCloud, predicate=lambda x: x.cloud == CloudTypes.AKS)
|
||||
class AzureHostDiscovery(Discovery):
|
||||
"""Azure Host Discovery
|
||||
Discovers AKS specific nodes when running as a pod in Azure
|
||||
"""
|
||||
def __init__(self, event):
|
||||
self.event = event
|
||||
|
||||
def is_azure_api(self):
|
||||
try:
|
||||
logging.debug("From pod attempting to access Azure Metadata API")
|
||||
if requests.get("http://169.254.169.254/metadata/instance?api-version=2017-08-01", headers={"Metadata":"true"}, timeout=5).status_code == 200:
|
||||
return True
|
||||
except requests.exceptions.ConnectionError:
|
||||
return False
|
||||
|
||||
# quering azure's interface metadata api | works only from a pod
|
||||
def azure_metadata_subnets_discovery(self):
|
||||
logging.debug("From pod attempting to access azure's metadata")
|
||||
machine_metadata = json.loads(requests.get("http://169.254.169.254/metadata/instance?api-version=2017-08-01", headers={"Metadata":"true"}).text)
|
||||
subnets = list()
|
||||
for interface in machine_metadata["network"]["interface"]:
|
||||
address, subnet = interface["ipv4"]["subnet"][0]["address"], interface["ipv4"]["subnet"][0]["prefix"]
|
||||
logging.debug("From pod discovered subnet {0}/{1}".format(address, subnet if not config.quick else "24"))
|
||||
subnets.append([address,subnet if not config.quick else "24"])
|
||||
|
||||
self.publish_event(AzureMetadataApi(cidr="{}/{}".format(address, subnet)))
|
||||
return subnets
|
||||
|
||||
def execute(self):
|
||||
if self.is_azure_api():
|
||||
for address, cidr in self.azure_metadata_subnets_discovery():
|
||||
logging.debug("Azure subnet scanning {0}/{1}".format(address, cidr))
|
||||
for ip in HostDiscoveryUtils.generate_subnet(ip=address, sn=cidr):
|
||||
self.publish_event(NewHostEvent(host=ip, cloud=CloudTypes.AKS))
|
||||
|
||||
@@ -4,17 +4,23 @@ import logging
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
from enum import Enum
|
||||
|
||||
import requests
|
||||
from netaddr import IPNetwork, IPAddress
|
||||
|
||||
from __main__ import config
|
||||
from netifaces import AF_INET, ifaddresses, interfaces
|
||||
from netifaces import AF_INET, ifaddresses, interfaces, gateways
|
||||
|
||||
from ...core.events import handler
|
||||
from ...core.events.types import Event, NewHostEvent, Vulnerability
|
||||
from ...core.types import Discovery, InformationDisclosure, Azure
|
||||
from ...core.types import Discovery, InformationDisclosure, Azure, CloudTypes
|
||||
|
||||
class RunningPodOnCloud(Event):
|
||||
def __init__(self, cloud):
|
||||
self.cloud = cloud
|
||||
|
||||
class HostScanEvent(Event):
|
||||
pass
|
||||
|
||||
class RunningAsPodEvent(Event):
|
||||
def __init__(self):
|
||||
@@ -29,7 +35,6 @@ class RunningAsPodEvent(Event):
|
||||
location = "Local to Pod"
|
||||
if 'HOSTNAME' in os.environ:
|
||||
location += "(" + os.environ['HOSTNAME'] + ")"
|
||||
|
||||
return location
|
||||
|
||||
def get_service_account_file(self, file):
|
||||
@@ -39,43 +44,55 @@ class RunningAsPodEvent(Event):
|
||||
except IOError:
|
||||
pass
|
||||
|
||||
class AzureMetadataApi(Vulnerability, Event):
|
||||
"""Access to the Azure Metadata API exposes information about the machines associated with the cluster"""
|
||||
def __init__(self, cidr):
|
||||
Vulnerability.__init__(self, Azure, "Azure Metadata Exposure", category=InformationDisclosure, vid="KHV003")
|
||||
self.cidr = cidr
|
||||
self.evidence = "cidr: {}".format(cidr)
|
||||
|
||||
class HostScanEvent(Event):
|
||||
def __init__(self, pod=False, active=False, predefined_hosts=list()):
|
||||
self.active = active # flag to specify whether to get actual data from vulnerabilities
|
||||
self.predefined_hosts = predefined_hosts
|
||||
|
||||
class HostDiscoveryHelpers:
|
||||
class HostDiscoveryUtils:
|
||||
""" Static class containes util functions for Host discovery processes """
|
||||
@staticmethod
|
||||
def get_cloud(host):
|
||||
""" Returns cloud for a given ip address, defaults to NO_CLOUD """
|
||||
cloud = ""
|
||||
try:
|
||||
logging.debug("Checking whether the cluster is deployed on azure's cloud")
|
||||
logging.debug("Checking if {} is deployed on a cloud".format(host))
|
||||
# azurespeed.com provide their API via HTTP only; the service can be queried with
|
||||
# HTTPS, but doesn't show a proper certificate. Since no encryption is worse then
|
||||
# any encryption, we go with the verify=false option for the time being. At least
|
||||
# this prevents leaking internal IP addresses to passive eavesdropping.
|
||||
# TODO: find a more secure service to detect cloud IPs
|
||||
metadata = requests.get("https://www.azurespeed.com/api/region?ipOrUrl={ip}".format(ip=host), verify=False).text
|
||||
if "cloud" in metadata:
|
||||
cloud = json.loads(metadata)["cloud"]
|
||||
except requests.ConnectionError as e:
|
||||
logging.info("- unable to check cloud: {0}".format(e))
|
||||
return
|
||||
if "cloud" in metadata:
|
||||
return json.loads(metadata)["cloud"]
|
||||
|
||||
# generator, generating a subnet by given a cidr
|
||||
return CloudTypes.get_enum(cloud)
|
||||
|
||||
@staticmethod
|
||||
def get_default_gateway():
|
||||
return gateways()['default'][AF_INET][0]
|
||||
|
||||
@staticmethod
|
||||
def get_external_ip():
|
||||
external_ip = None
|
||||
try:
|
||||
logging.debug("HostDiscovery hunter attempting to get external IP address")
|
||||
external_ip = requests.get("https://canhazip.com", verify=False).text # getting external ip, to determine if cloud cluster
|
||||
except requests.ConnectionError as e:
|
||||
logging.debug("unable to determine external IP address: {0}".format(e))
|
||||
return external_ip
|
||||
|
||||
# generator, generating ip addresses from a given cidr
|
||||
@staticmethod
|
||||
def generate_subnet(ip, sn="24"):
|
||||
logging.debug("HostDiscoveryHelpers.generate_subnet {0}/{1}".format(ip, sn))
|
||||
subnet = IPNetwork('{ip}/{sn}'.format(ip=ip, sn=sn))
|
||||
for ip in IPNetwork(subnet):
|
||||
logging.debug("HostDiscoveryHelpers.generate_subnet yielding {0}".format(ip))
|
||||
yield ip
|
||||
logging.debug("HostDiscoveryUtils.generate_subnet {0}/{1}".format(ip, sn))
|
||||
return IPNetwork('{ip}/{sn}'.format(ip=ip, sn=sn))
|
||||
|
||||
# generate ip addresses from all internal network interfaces
|
||||
@staticmethod
|
||||
def generate_interfaces_subnet(sn='24'):
|
||||
for ifaceName in interfaces():
|
||||
for ip in [i['addr'] for i in ifaddresses(ifaceName).setdefault(AF_INET, [])]:
|
||||
for ip in HostDiscoveryUtils.generate_subnet(ip, sn):
|
||||
yield ip
|
||||
|
||||
|
||||
@handler.subscribe(RunningAsPodEvent)
|
||||
@@ -87,59 +104,36 @@ class FromPodHostDiscovery(Discovery):
|
||||
self.event = event
|
||||
|
||||
def execute(self):
|
||||
# Scan any hosts that the user specified
|
||||
# If user has specified specific remotes, scanning only them
|
||||
if config.remote or config.cidr:
|
||||
self.publish_event(HostScanEvent())
|
||||
else:
|
||||
# Discover cluster subnets, we'll scan all these hosts
|
||||
if self.is_azure_pod():
|
||||
subnets, cloud = self.azure_metadata_discovery()
|
||||
else:
|
||||
subnets, cloud = self.traceroute_discovery()
|
||||
# figuring out the cloud from the external ip, default to CloudTypes.NO_CLOUD
|
||||
external_ip = HostDiscoveryUtils.get_external_ip()
|
||||
cloud = HostDiscoveryUtils.get_cloud(external_ip)
|
||||
|
||||
should_scan_apiserver = False
|
||||
# specific cloud discoveries should subscribe to RunningPodOnCloud
|
||||
if cloud != CloudTypes.NO_CLOUD:
|
||||
self.publish_event(RunningPodOnCloud(cloud=cloud))
|
||||
|
||||
# normal pod discovery
|
||||
pod_subnet = self.pod_subnet_discovery()
|
||||
logging.debug("From pod scanning subnet {0}/{1}".format(pod_subnet[0], pod_subnet[1]))
|
||||
for ip in HostDiscoveryUtils.generate_subnet(ip=pod_subnet[0], sn=pod_subnet[1]):
|
||||
self.publish_event(NewHostEvent(host=ip, cloud=cloud))
|
||||
|
||||
# manually publishing the Api server host if outside the subnet
|
||||
if self.event.kubeservicehost:
|
||||
should_scan_apiserver = True
|
||||
for subnet in subnets:
|
||||
if self.event.kubeservicehost and self.event.kubeservicehost in IPNetwork("{}/{}".format(subnet[0], subnet[1])):
|
||||
should_scan_apiserver = False
|
||||
logging.debug("From pod scanning subnet {0}/{1}".format(subnet[0], subnet[1]))
|
||||
for ip in HostDiscoveryHelpers.generate_subnet(ip=subnet[0], sn=subnet[1]):
|
||||
self.publish_event(NewHostEvent(host=ip, cloud=cloud))
|
||||
if should_scan_apiserver:
|
||||
self.publish_event(NewHostEvent(host=IPAddress(self.event.kubeservicehost), cloud=cloud))
|
||||
if self.event.kubeservicehost not in IPNetwork("{}/{}".format(pod_subnet[0], pod_subnet[1])):
|
||||
self.publish_event(NewHostEvent(host=IPAddress(self.event.kubeservicehost), cloud=cloud))
|
||||
|
||||
|
||||
def is_azure_pod(self):
|
||||
try:
|
||||
logging.debug("From pod attempting to access Azure Metadata API")
|
||||
if requests.get("http://169.254.169.254/metadata/instance?api-version=2017-08-01", headers={"Metadata":"true"}, timeout=5).status_code == 200:
|
||||
return True
|
||||
except requests.exceptions.ConnectionError:
|
||||
return False
|
||||
def pod_subnet_discovery(self):
|
||||
# normal option when running as a pod is to scan it's own subnet
|
||||
# The gateway connects us to the host, and we can discover the
|
||||
# kubelet from there, other ip's are pods that are running
|
||||
# next to us,
|
||||
return HostDiscoveryUtils.get_default_gateway(), "24"
|
||||
|
||||
# for pod scanning
|
||||
def traceroute_discovery(self):
|
||||
external_ip = requests.get("http://canhazip.com").text # getting external ip, to determine if cloud cluster
|
||||
from scapy.all import ICMP, IP, Ether, srp1
|
||||
|
||||
node_internal_ip = srp1(Ether() / IP(dst="google.com" , ttl=1) / ICMP(), verbose=0)[IP].src
|
||||
return [ [node_internal_ip,"24"], ], external_ip
|
||||
|
||||
# querying azure's interface metadata api | works only from a pod
|
||||
def azure_metadata_discovery(self):
|
||||
logging.debug("From pod attempting to access azure's metadata")
|
||||
machine_metadata = json.loads(requests.get("http://169.254.169.254/metadata/instance?api-version=2017-08-01", headers={"Metadata":"true"}).text)
|
||||
address, subnet = "", ""
|
||||
subnets = list()
|
||||
for interface in machine_metadata["network"]["interface"]:
|
||||
address, subnet = interface["ipv4"]["subnet"][0]["address"], interface["ipv4"]["subnet"][0]["prefix"]
|
||||
logging.debug("From pod discovered subnet {0}/{1}".format(address, subnet if not config.quick else "24"))
|
||||
subnets.append([address,subnet if not config.quick else "24"])
|
||||
|
||||
self.publish_event(AzureMetadataApi(cidr="{}/{}".format(address, subnet)))
|
||||
|
||||
return subnets, "Azure"
|
||||
|
||||
@handler.subscribe(HostScanEvent)
|
||||
class HostDiscovery(Discovery):
|
||||
@@ -150,43 +144,25 @@ class HostDiscovery(Discovery):
|
||||
self.event = event
|
||||
|
||||
def execute(self):
|
||||
# handling multiple scan options
|
||||
if config.cidr:
|
||||
try:
|
||||
ip, sn = config.cidr.split('/')
|
||||
except ValueError as e:
|
||||
logging.error("unable to parse cidr: {0}".format(e))
|
||||
return
|
||||
cloud = HostDiscoveryHelpers.get_cloud(ip)
|
||||
for ip in HostDiscoveryHelpers.generate_subnet(ip, sn=sn):
|
||||
cloud = HostDiscoveryUtils.get_cloud(ip)
|
||||
for ip in HostDiscoveryUtils.generate_subnet(ip, sn=sn):
|
||||
self.publish_event(NewHostEvent(host=ip, cloud=cloud))
|
||||
elif config.interface:
|
||||
if config.interface:
|
||||
self.scan_interfaces()
|
||||
elif len(config.remote) > 0:
|
||||
if config.remote:
|
||||
for host in config.remote:
|
||||
self.publish_event(NewHostEvent(host=host, cloud=HostDiscoveryHelpers.get_cloud(host)))
|
||||
self.publish_event(NewHostEvent(host=host, cloud=HostDiscoveryUtils.get_cloud(host)))
|
||||
|
||||
# for normal scanning
|
||||
def scan_interfaces(self):
|
||||
try:
|
||||
logging.debug("HostDiscovery hunter attempting to get external IP address")
|
||||
external_ip = requests.get("http://canhazip.com").text # getting external ip, to determine if cloud cluster
|
||||
except requests.ConnectionError as e:
|
||||
logging.debug("unable to determine local IP address: {0}".format(e))
|
||||
logging.info("~ default to 127.0.0.1")
|
||||
external_ip = "127.0.0.1"
|
||||
cloud = HostDiscoveryHelpers.get_cloud(external_ip)
|
||||
for ip in self.generate_interfaces_subnet():
|
||||
external_ip = HostDiscoveryUtils.get_external_ip()
|
||||
cloud = HostDiscoveryUtils.get_cloud(host=external_ip)
|
||||
for ip in HostDiscoveryUtils.generate_interfaces_subnet():
|
||||
handler.publish_event(NewHostEvent(host=ip, cloud=cloud))
|
||||
|
||||
# generate all subnets from all internal network interfaces
|
||||
def generate_interfaces_subnet(self, sn='24'):
|
||||
for ifaceName in interfaces():
|
||||
for ip in [i['addr'] for i in ifaddresses(ifaceName).setdefault(AF_INET, [])]:
|
||||
if not self.event.localhost and InterfaceTypes.LOCALHOST.value in ip.__str__():
|
||||
continue
|
||||
for ip in HostDiscoveryHelpers.generate_subnet(ip, sn):
|
||||
yield ip
|
||||
|
||||
# for comparing prefixes
|
||||
class InterfaceTypes(Enum):
|
||||
LOCALHOST = "127"
|
||||
|
||||
@@ -7,7 +7,7 @@ from .kubelet import ExposedRunHandler
|
||||
|
||||
from ...core.events import handler
|
||||
from ...core.events.types import Event, Vulnerability
|
||||
from ...core.types import Hunter, ActiveHunter, IdentityTheft, Azure
|
||||
from ...core.types import Hunter, ActiveHunter, IdentityTheft, Azure, CloudTypes
|
||||
|
||||
|
||||
class AzureSpnExposure(Vulnerability, Event):
|
||||
@@ -16,7 +16,7 @@ class AzureSpnExposure(Vulnerability, Event):
|
||||
Vulnerability.__init__(self, Azure, "Azure SPN Exposure", category=IdentityTheft, vid="KHV004")
|
||||
self.container = container
|
||||
|
||||
@handler.subscribe(ExposedRunHandler, predicate=lambda x: x.cloud=="Azure")
|
||||
@handler.subscribe(ExposedRunHandler, predicate=lambda x: x.cloud==CloudTypes.AKS)
|
||||
class AzureSpnHunter(Hunter):
|
||||
"""AKS Hunting
|
||||
Hunting Azure cluster deployments using specific known configurations
|
||||
|
||||
@@ -2,41 +2,65 @@ import requests_mock
|
||||
import time
|
||||
from queue import Empty
|
||||
|
||||
from src.modules.discovery.hosts import FromPodHostDiscovery, RunningAsPodEvent, HostScanEvent, AzureMetadataApi
|
||||
from src.modules.discovery.aks import AzureHostDiscovery, AzureMetadataApi
|
||||
from src.modules.discovery.hosts import HostScanEvent, RunningPodOnCloud, FromPodHostDiscovery
|
||||
from src.core.events.types import Event, NewHostEvent
|
||||
from src.core.events import handler
|
||||
from src.core.types import CloudTypes
|
||||
|
||||
from __main__ import config
|
||||
|
||||
def test_FromPodHostDiscovery():
|
||||
# global variables for cloud discovery check
|
||||
aws_triggered = False
|
||||
azure_triggered = False
|
||||
|
||||
def test_AzureHostDiscovery():
|
||||
config.remote = None
|
||||
config.cidr = None
|
||||
config.pod = True
|
||||
with requests_mock.Mocker() as m:
|
||||
e = RunningAsPodEvent()
|
||||
|
||||
config.azure = False
|
||||
config.remote = None
|
||||
config.cidr = None
|
||||
m.get("http://169.254.169.254/metadata/instance?api-version=2017-08-01", status_code=404)
|
||||
f = FromPodHostDiscovery(e)
|
||||
assert not f.is_azure_pod()
|
||||
# TODO For now we don't test the traceroute discovery version
|
||||
# f.execute()
|
||||
f = FromPodHostDiscovery(HostScanEvent())
|
||||
m.get("https://canhazip.com", text="1.2.3.4")
|
||||
m.get("https://www.azurespeed.com/api/region?ipOrUrl=1.2.3.4", text="""{
|
||||
"cloud": "Azure",
|
||||
"regionId": null,
|
||||
"region": null,
|
||||
"location": null,
|
||||
"ipAddress": "1.2.3.4"}""")
|
||||
|
||||
# Test that we generate NewHostEvent for the addresses reported by the Azure Metadata API
|
||||
config.azure = True
|
||||
m.get("http://169.254.169.254/metadata/instance?api-version=2017-08-01", \
|
||||
text='{"network":{"interface":[{"ipv4":{"subnet":[{"address": "3.4.5.6", "prefix": "255.255.255.252"}]}}]}}')
|
||||
assert f.is_azure_pod()
|
||||
|
||||
f.execute()
|
||||
time.sleep(0.1)
|
||||
assert azure_triggered
|
||||
|
||||
# Test that we don't trigger a HostScanEvent unless either config.remote or config.cidr are configured
|
||||
config.remote = "1.2.3.4"
|
||||
def test_AWSPodDiscovery():
|
||||
with requests_mock.Mocker() as m:
|
||||
e = HostScanEvent()
|
||||
config.remote = None
|
||||
config.cidr = None
|
||||
config.pod = True
|
||||
|
||||
f = FromPodHostDiscovery(e)
|
||||
m.get("https://canhazip.com", text="1.2.3.4")
|
||||
m.get("https://www.azurespeed.com/api/region?ipOrUrl=1.2.3.4", text="""{
|
||||
"cloud": "AWS",
|
||||
"regionId": null,
|
||||
"region": null,
|
||||
"location": null,
|
||||
"ipAddress": "1.2.3.4"}""")
|
||||
f.execute()
|
||||
time.sleep(0.1)
|
||||
assert aws_triggered
|
||||
|
||||
config.azure = False
|
||||
config.remote = None
|
||||
config.cidr = "1.2.3.4/24"
|
||||
f.execute()
|
||||
|
||||
@handler.subscribe(RunningPodOnCloud, predicate = lambda x: x.cloud == CloudTypes.EKS)
|
||||
class testAWSCloud(object):
|
||||
def __init__(self, event):
|
||||
global aws_triggered
|
||||
aws_triggered = True
|
||||
|
||||
# In this set of tests we should only trigger HostScanEvent when remote or cidr are set
|
||||
@handler.subscribe(HostScanEvent)
|
||||
@@ -44,16 +68,14 @@ class testHostDiscovery(object):
|
||||
def __init__(self, event):
|
||||
assert config.remote is not None or config.cidr is not None
|
||||
assert config.remote == "1.2.3.4" or config.cidr == "1.2.3.4/24"
|
||||
|
||||
# In this set of tests we should only get as far as finding a host if it's Azure
|
||||
# because we're not running the code that would normally be triggered by a HostScanEvent
|
||||
|
||||
@handler.subscribe(NewHostEvent)
|
||||
class testHostDiscoveryEvent(object):
|
||||
class testNewHostEvent(object):
|
||||
def __init__(self, event):
|
||||
assert config.azure
|
||||
assert str(event.host).startswith("3.4.5.")
|
||||
assert config.remote is None
|
||||
assert config.cidr is None
|
||||
if event.cloud == CloudTypes.AKS:
|
||||
global azure_triggered
|
||||
azure_triggered = True
|
||||
assert not str(event.host).startswith("3.4.5")
|
||||
|
||||
# Test that we only report this event for Azure hosts
|
||||
@handler.subscribe(AzureMetadataApi)
|
||||
|
||||
Reference in New Issue
Block a user