mirror of
https://github.com/aquasecurity/kube-hunter.git
synced 2026-05-08 18:26:48 +00:00
Merge pull request #94 from aquasecurity/hosts-test
Add tests for host discovery
This commit is contained in:
@@ -3,7 +3,8 @@ language: python
|
||||
cache: pip
|
||||
matrix:
|
||||
allow_failures:
|
||||
- python: 2.7
|
||||
# Python 3 tests are failing due to an import error
|
||||
- python: 3.6
|
||||
include:
|
||||
- python: 2.7
|
||||
#- python: 3.4
|
||||
|
||||
@@ -6,3 +6,4 @@ requests
|
||||
PrettyTable
|
||||
urllib3
|
||||
ruamel.yaml
|
||||
requests_mock
|
||||
|
||||
@@ -16,7 +16,7 @@ parser.add_argument('--report', type=str, default='plain', help="set report type
|
||||
config = parser.parse_args()
|
||||
|
||||
def main():
|
||||
pytest.main(['.'])
|
||||
exit(pytest.main(['.']))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
62
src/modules/discovery/test_hosts.py
Normal file
62
src/modules/discovery/test_hosts.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import requests_mock
|
||||
import time
|
||||
from Queue import Empty
|
||||
|
||||
from hosts import FromPodHostDiscovery, RunningAsPodEvent, HostScanEvent, AzureMetadataApi
|
||||
from ...core.events.types import Event, NewHostEvent
|
||||
from ...core.events import handler
|
||||
from __main__ import config
|
||||
|
||||
def test_FromPodHostDiscovery():
|
||||
|
||||
with requests_mock.Mocker() as m:
|
||||
e = RunningAsPodEvent()
|
||||
|
||||
config.azure = False
|
||||
config.remote = None
|
||||
config.cidr = None
|
||||
m.get("http://169.254.169.254/metadata/instance?api-version=2017-08-01", status_code=404)
|
||||
f = FromPodHostDiscovery(e)
|
||||
assert not f.is_azure_pod()
|
||||
# TODO For now we don't test the traceroute discovery version
|
||||
# f.execute()
|
||||
|
||||
# Test that we generate NewHostEvent for the addresses reported by the Azure Metadata API
|
||||
config.azure = True
|
||||
m.get("http://169.254.169.254/metadata/instance?api-version=2017-08-01", \
|
||||
text='{"network":{"interface":[{"ipv4":{"subnet":[{"address": "3.4.5.6", "prefix": "255.255.255.252"}]}}]}}')
|
||||
assert f.is_azure_pod()
|
||||
f.execute()
|
||||
|
||||
# Test that we don't trigger a HostScanEvent unless either config.remote or config.cidr are configured
|
||||
config.remote = "1.2.3.4"
|
||||
f.execute()
|
||||
|
||||
config.azure = False
|
||||
config.remote = None
|
||||
config.cidr = "1.2.3.4/24"
|
||||
f.execute()
|
||||
|
||||
|
||||
# In this set of tests we should only trigger HostScanEvent when remote or cidr are set
|
||||
@handler.subscribe(HostScanEvent)
|
||||
class testHostDiscovery(object):
|
||||
def __init__(self, event):
|
||||
assert config.remote is not None or config.cidr is not None
|
||||
assert config.remote == "1.2.3.4" or config.cidr == "1.2.3.4/24"
|
||||
|
||||
# In this set of tests we should only get as far as finding a host if it's Azure
|
||||
# because we're not running the code that would normally be triggered by a HostScanEvent
|
||||
@handler.subscribe(NewHostEvent)
|
||||
class testHostDiscoveryEvent(object):
|
||||
def __init__(self, event):
|
||||
assert config.azure
|
||||
assert str(event.host).startswith("3.4.5.")
|
||||
assert config.remote is None
|
||||
assert config.cidr is None
|
||||
|
||||
# Test that we only report this event for Azure hosts
|
||||
@handler.subscribe(AzureMetadataApi)
|
||||
class testAzureMetadataApi(object):
|
||||
def __init__(self, event):
|
||||
assert config.azure
|
||||
Reference in New Issue
Block a user