Files
kube-hunter/tests/core/test_subscribe.py
Yehuda Chikvashvili fe3dba90d8 Refactor configuration (#283)
* Remove __main__ references and create a top-level config module

* Move conf module into separate standalone package

* Deprecate install_imports.py script

* Rename root package to kube_hunter

The previous src root package name was too generic and not unique,
so it can be used as external name.
Change `src` to `kube_hunter` so it can be referenced in a clear way.
Addtional changes made on the way:
* Make imports absolute
* Formatting

Relates to #185

* remove todos

Co-authored-by: Ryan Lahfa <masterancpp@gmail.com>
Co-authored-by: Itay Shakury <itay@itaysk.com>
2019-12-29 14:18:58 +02:00

50 lines
1.2 KiB
Python

import time
from kube_hunter.core.types import Hunter
from kube_hunter.core.events.types import Event, Service
from kube_hunter.core.events import handler
counter = 0
class OnceOnlyEvent(Service, Event):
def __init__(self):
Service.__init__(self, "Test Once Service")
class RegularEvent(Service, Event):
def __init__(self):
Service.__init__(self, "Test Service")
@handler.subscribe_once(OnceOnlyEvent)
class OnceHunter(Hunter):
def __init__(self, event):
global counter
counter += 1
@handler.subscribe(RegularEvent)
class RegularHunter(Hunter):
def __init__(self, event):
global counter
counter += 1
def test_subscribe_mechanism():
global counter
# first test normal subscribe and publish works
handler.publish_event(RegularEvent())
handler.publish_event(RegularEvent())
handler.publish_event(RegularEvent())
time.sleep(0.02)
assert counter == 3
counter = 0
# testing the subscribe_once mechanism
handler.publish_event(OnceOnlyEvent())
handler.publish_event(OnceOnlyEvent())
handler.publish_event(OnceOnlyEvent())
time.sleep(0.02)
# should have been triggered once
assert counter == 1