Files
krkn/tests/test_ingress_network_plugin.py
Tullio Sebastiani d91172d9b2 Core Refactoring, Krkn Scenario Plugin API (#694)
* relocated shared libraries from `kraken` to `krkn` folder

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* AbstractScenarioPlugin and ScenarioPluginFactory

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* application_outage porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* arcaflow_scenarios porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* managedcluster_scenarios porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* network_chaos porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* node_actions porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* plugin_scenarios porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* pvc_scenarios porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* service_disruption porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* service_hijacking porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* cluster_shut_down_scenarios porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* syn_flood porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* time_scenarios porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* zone_outages porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* ScenarioPluginFactory tests

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* unit tests update

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* pod_scenarios and post actions deprecated

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

scenarios post_actions

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* funtests and config update

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* run_krkn.py update

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* utils porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* API Documentation

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* container_scenarios porting

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* funtest fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* document gif update

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* Documentation + tests update

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* removed example plugin

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* global renaming

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

test fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

test fix

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* config.yaml typos

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

typos

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* removed `plugin_scenarios` from NativScenarioPlugin class

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* pod_network_scenarios type added

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* documentation update

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

* krkn-lib update

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

typo

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>

---------

Signed-off-by: Tullio Sebastiani <tsebasti@redhat.com>
2024-10-03 20:48:04 +02:00

65 lines
2.0 KiB
Python

import unittest
import logging
from arcaflow_plugin_sdk import plugin
from krkn.scenario_plugins.native.network import ingress_shaping
class NetworkScenariosTest(unittest.TestCase):
def test_serialization(self):
plugin.test_object_serialization(
ingress_shaping.NetworkScenarioConfig(
node_interface_name={"foo": ["bar"]},
network_params={
"latency": "50ms",
"loss": "0.02",
"bandwidth": "100mbit",
},
),
self.fail,
)
plugin.test_object_serialization(
ingress_shaping.NetworkScenarioSuccessOutput(
filter_direction="ingress",
test_interfaces={"foo": ["bar"]},
network_parameters={
"latency": "50ms",
"loss": "0.02",
"bandwidth": "100mbit",
},
execution_type="parallel",
),
self.fail,
)
plugin.test_object_serialization(
ingress_shaping.NetworkScenarioErrorOutput(
error="Hello World",
),
self.fail,
)
def test_network_chaos(self):
output_id, output_data = ingress_shaping.network_chaos(
params=ingress_shaping.NetworkScenarioConfig(
label_selector="node-role.kubernetes.io/control-plane",
instance_count=1,
network_params={
"latency": "50ms",
"loss": "0.02",
"bandwidth": "100mbit",
},
),
run_id="network-shaping-test",
)
if output_id == "error":
logging.error(output_data.error)
self.fail(
"The network chaos scenario did not complete successfully "
"because an error/exception occurred"
)
if __name__ == "__main__":
unittest.main()