diff --git a/tests/test_ingress_network_plugin.py b/tests/test_ingress_network_plugin.py index 201f02da..04b7ebd6 100644 --- a/tests/test_ingress_network_plugin.py +++ b/tests/test_ingress_network_plugin.py @@ -1,44 +1,60 @@ import unittest -import os -import os import logging from arcaflow_plugin_sdk import plugin -import kraken.plugins.network.kubernetes_functions as kube_helper from kraken.plugins.network import ingress_shaping + class NetworkScenariosTest(unittest.TestCase): def test_serialization(self): plugin.test_object_serialization( - ingress_shaping.NetworkScenarioConfig(node_interface_name={"foo": ['bar']}, network_params={ - "latency": "50ms" , "loss": "0.02", "bandwidth": "100mbit"}), + ingress_shaping.NetworkScenarioConfig( + node_interface_name={"foo": ['bar']}, + network_params={ + "latency": "50ms", + "loss": "0.02", + "bandwidth": "100mbit" + } + ), self.fail, ) plugin.test_object_serialization( ingress_shaping.NetworkScenarioSuccessOutput( - filter_direction="ingress", test_interfaces= {"foo": ['bar']}, network_parameters={ - "latency": "50ms" , "loss": "0.02", "bandwidth": "100mbit"}, execution_type="parallel"), + filter_direction="ingress", + test_interfaces={"foo": ['bar']}, + network_parameters={ + "latency": "50ms", + "loss": "0.02", + "bandwidth": "100mbit" + }, + execution_type="parallel"), self.fail, ) plugin.test_object_serialization( ingress_shaping.NetworkScenarioErrorOutput( - error="Hello World", + error="Hello World", ), self.fail, ) - def test_network_chaos(self): - output_id, output_data = ingress_shaping.network_chaos( ingress_shaping.NetworkScenarioConfig( - label_selector="node-role.kubernetes.io/master", instance_count=1, - network_params = {"latency": "50ms" , "loss": "0.02", "bandwidth": "100mbit"} + label_selector="node-role.kubernetes.io/master", + instance_count=1, + network_params={ + "latency": "50ms", + "loss": "0.02", + "bandwidth": "100mbit" + } ) ) if output_id == "error": logging.error(output_data.error) - self.fail("The network chaos scenario did not complete successfully because an error/exception occurred") + self.fail( + "The network chaos scenario did not complete successfully " + "because an error/exception occurred" + ) if __name__ == "__main__":