From dfc3a1d71664def68df0a94ee4ce6186faccc74b Mon Sep 17 00:00:00 2001 From: Sahil Shah Date: Thu, 9 Apr 2026 10:47:50 -0400 Subject: [PATCH] Adding http load scenario (#1160) Signed-off-by: Sahil Shah --- config/config.yaml | 2 + krkn/scenario_plugins/http_load/__init__.py | 0 .../http_load/http_load_scenario_plugin.py | 563 ++++++++++++++++++ scenarios/kube/http_load_scenario.yml | 29 + tests/test_http_load_scenario_plugin.py | 440 ++++++++++++++ 5 files changed, 1034 insertions(+) create mode 100644 krkn/scenario_plugins/http_load/__init__.py create mode 100644 krkn/scenario_plugins/http_load/http_load_scenario_plugin.py create mode 100644 scenarios/kube/http_load_scenario.yml create mode 100644 tests/test_http_load_scenario_plugin.py diff --git a/config/config.yaml b/config/config.yaml index 580f79de..8dbb6679 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -55,6 +55,8 @@ kraken: - scenarios/kube/node_interface_down.yaml - kubevirt_vm_outage: - scenarios/kubevirt/kubevirt-vm-outage.yaml + - http_load_scenarios: + - scenarios/kube/http_load_scenario.yml resiliency: resiliency_run_mode: standalone # Options: standalone, detailed, disabled diff --git a/krkn/scenario_plugins/http_load/__init__.py b/krkn/scenario_plugins/http_load/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/krkn/scenario_plugins/http_load/http_load_scenario_plugin.py b/krkn/scenario_plugins/http_load/http_load_scenario_plugin.py new file mode 100644 index 00000000..c55dfedb --- /dev/null +++ b/krkn/scenario_plugins/http_load/http_load_scenario_plugin.py @@ -0,0 +1,563 @@ +import base64 +import json +import logging +import time +from typing import Dict, List, Any + +import yaml +from krkn_lib.models.telemetry import ScenarioTelemetry +from krkn_lib.telemetry.ocp import KrknTelemetryOpenshift +from krkn_lib.utils import get_random_string + +from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin +from krkn.rollback.config import RollbackContent +from krkn.rollback.handler import set_rollback_context_decorator + + +class HttpLoadScenarioPlugin(AbstractScenarioPlugin): + """ + HTTP Load Testing Scenario Plugin using Vegeta. + + Deploys Vegeta load testing pods inside the Kubernetes cluster for distributed + HTTP load testing. Supports multiple concurrent pods, node affinity, authentication, + and comprehensive results collection. + """ + + def __init__(self, scenario_type: str = "http_load_scenarios"): + super().__init__(scenario_type=scenario_type) + + @set_rollback_context_decorator + def run( + self, + run_uuid: str, + scenario: str, + lib_telemetry: KrknTelemetryOpenshift, + scenario_telemetry: ScenarioTelemetry, + ) -> int: + """ + Main entry point for HTTP load scenario execution. + + Deploys Vegeta load testing pods inside the cluster for distributed load testing. + + :param run_uuid: Unique identifier for this chaos run + :param scenario: Path to scenario configuration file + :param lib_telemetry: Telemetry object for Kubernetes operations + :param scenario_telemetry: Telemetry object for this scenario + :return: 0 on success, 1 on failure + """ + try: + # Load scenario configuration + with open(scenario, "r") as f: + scenario_configs = yaml.full_load(f) + + if not scenario_configs: + logging.error("Empty scenario configuration file") + return 1 + + # Process each scenario configuration + for scenario_config in scenario_configs: + if not isinstance(scenario_config, dict): + logging.error(f"Invalid scenario configuration format: {scenario_config}") + return 1 + + # Get the http_load_scenario configuration + config = scenario_config.get("http_load_scenario", scenario_config) + + # Validate configuration + if not self._validate_config(config): + return 1 + + # Execute the load test (deploy pods) + result = self._execute_distributed_load_test( + config, + lib_telemetry, + scenario_telemetry + ) + + if result != 0: + return result + + logging.info("HTTP load test completed successfully") + return 0 + + except Exception as e: + logging.error(f"HTTP load scenario failed with exception: {e}") + import traceback + logging.error(traceback.format_exc()) + return 1 + + def get_scenario_types(self) -> list[str]: + """Return the scenario types this plugin handles.""" + return ["http_load_scenarios"] + + def _validate_config(self, config: Dict[str, Any]) -> bool: + """ + Validate scenario configuration. + + :param config: Scenario configuration dictionary + :return: True if valid, False otherwise + """ + # Check for required fields + if "targets" not in config: + logging.error("Missing required field: targets") + return False + + targets = config["targets"] + + # Validate targets configuration + if "endpoints" not in targets: + logging.error("targets must contain 'endpoints'") + return False + + if "endpoints" in targets: + endpoints = targets["endpoints"] + if not isinstance(endpoints, list) or len(endpoints) == 0: + logging.error("endpoints must be a non-empty list") + return False + + # Validate each endpoint + for idx, endpoint in enumerate(endpoints): + if not isinstance(endpoint, dict): + logging.error(f"Endpoint {idx} must be a dictionary") + return False + if "url" not in endpoint: + logging.error(f"Endpoint {idx} missing required field: url") + return False + if "method" not in endpoint: + logging.error(f"Endpoint {idx} missing required field: method") + return False + + # Validate rate format + if "rate" in config: + rate = config["rate"] + if not isinstance(rate, (str, int)): + logging.error("rate must be a string (e.g., '200/1s') or integer") + return False + + # Validate duration format + if "duration" in config: + duration = config["duration"] + if not isinstance(duration, (str, int)): + logging.error("duration must be a string (e.g., '30s') or integer") + return False + + return True + + def _execute_distributed_load_test( + self, + config: Dict[str, Any], + lib_telemetry: KrknTelemetryOpenshift, + scenario_telemetry: ScenarioTelemetry + ) -> int: + """ + Execute distributed HTTP load test by deploying Vegeta pods. + + :param config: Scenario configuration + :param lib_telemetry: Telemetry object for Kubernetes operations + :param scenario_telemetry: Telemetry object for recording results + :return: 0 on success, 1 on failure + """ + pod_names = [] + namespace = config.get("namespace", "default") + + try: + # Get number of pods to deploy + number_of_pods = config.get("number-of-pods", 1) + + # Get container image + image = config.get("image", "quay.io/krkn-chaos/krkn-http-load:latest") + + # Get endpoints + endpoints = config.get("targets", {}).get("endpoints", []) + if not endpoints: + logging.error("No endpoints specified in targets") + return 1 + + # Build Vegeta JSON targets for all endpoints (round-robin) + targets_json = self._build_vegeta_json_targets(endpoints) + targets_json_base64 = base64.b64encode(targets_json.encode()).decode() + + target_urls = [ep["url"] for ep in endpoints] + logging.info(f"Targeting {len(endpoints)} endpoint(s): {target_urls}") + + # Get node selectors for pod placement + node_selectors = config.get("attacker-nodes") + + # Deploy multiple Vegeta pods + logging.info(f"Deploying {number_of_pods} HTTP load testing pod(s)") + + for i in range(number_of_pods): + pod_name = f"http-load-{get_random_string(10)}" + + logging.info(f"Deploying pod {i+1}/{number_of_pods}: {pod_name}") + + # Deploy pod using krkn-lib + lib_telemetry.get_lib_kubernetes().deploy_http_load( + name=pod_name, + namespace=namespace, + image=image, + targets_json_base64=targets_json_base64, + duration=config.get("duration", "30s"), + rate=config.get("rate", "50/1s"), + workers=config.get("workers", 10), + max_workers=config.get("max_workers", 100), + connections=config.get("connections", 100), + timeout=config.get("timeout", "10s"), + keepalive=config.get("keepalive", True), + http2=config.get("http2", True), + insecure=config.get("insecure", False), + node_selectors=node_selectors, + timeout_sec=500 + ) + + pod_names.append(pod_name) + + # Set rollback callable for pod cleanup + rollback_data = base64.b64encode(json.dumps(pod_names).encode('utf-8')).decode('utf-8') + self.rollback_handler.set_rollback_callable( + self.rollback_http_load_pods, + RollbackContent( + namespace=namespace, + resource_identifier=rollback_data, + ), + ) + + logging.info(f"Successfully deployed {len(pod_names)} HTTP load pod(s)") + + # Wait for all pods to complete + logging.info("Waiting for all HTTP load pods to complete...") + self._wait_for_pods_completion(pod_names, namespace, lib_telemetry, config) + + # Collect and aggregate results from all pods + metrics = self._collect_and_aggregate_results(pod_names, namespace, lib_telemetry) + + if metrics: + # Log metrics summary + self._log_metrics_summary(metrics) + + # Store metrics in telemetry + scenario_telemetry.additional_telemetry = metrics + + logging.info("HTTP load test completed successfully") + return 0 + + except Exception as e: + logging.error(f"Error executing distributed load test: {e}") + import traceback + logging.error(traceback.format_exc()) + return 1 + + def _build_vegeta_json_targets(self, endpoints: List[Dict[str, Any]]) -> str: + """ + Build newline-delimited Vegeta JSON targets from all endpoints. + + Vegeta round-robins across targets when multiple are provided. + Each line is a JSON object: {"method":"GET","url":"...","header":{...},"body":"base64..."} + + :param endpoints: List of endpoint configurations + :return: Newline-delimited JSON string + """ + lines = [] + for ep in endpoints: + target = { + "method": ep.get("method", "GET"), + "url": ep["url"], + } + + # Add headers + if "headers" in ep and ep["headers"]: + target["header"] = {k: [v] for k, v in ep["headers"].items()} + + # Add body (base64 encoded as Vegeta JSON format expects) + if "body" in ep and ep["body"]: + target["body"] = base64.b64encode(ep["body"].encode()).decode() + + lines.append(json.dumps(target, separators=(",", ":"))) + + return "\n".join(lines) + + def _wait_for_pods_completion( + self, + pod_names: List[str], + namespace: str, + lib_telemetry: KrknTelemetryOpenshift, + config: Dict[str, Any] + ): + """ + Wait for all HTTP load pods to complete. + + :param pod_names: List of pod names to wait for + :param namespace: Namespace where pods are running + :param lib_telemetry: Telemetry object for Kubernetes operations + :param config: Scenario configuration + """ + lib_k8s = lib_telemetry.get_lib_kubernetes() + finished_pods = [] + did_finish = False + + # Calculate max wait time (duration + buffer) + duration_str = config.get("duration", "30s") + max_wait = self._parse_duration_to_seconds(duration_str) + 60 # Add 60s buffer + start_time = time.time() + + while not did_finish: + for pod_name in pod_names: + if pod_name not in finished_pods: + if not lib_k8s.is_pod_running(pod_name, namespace): + finished_pods.append(pod_name) + logging.info(f"Pod {pod_name} has completed") + + if set(pod_names) == set(finished_pods): + did_finish = True + break + + # Check timeout + if time.time() - start_time > max_wait: + logging.warning(f"Timeout waiting for pods to complete (waited {max_wait}s)") + break + + time.sleep(5) + + logging.info(f"All {len(finished_pods)}/{len(pod_names)} pods have completed") + + def _collect_and_aggregate_results( + self, + pod_names: List[str], + namespace: str, + lib_telemetry: KrknTelemetryOpenshift + ) -> Dict[str, Any]: + """ + Collect results from all pods and aggregate metrics. + + :param pod_names: List of pod names + :param namespace: Namespace where pods ran + :param lib_telemetry: Telemetry object for Kubernetes operations + :return: Aggregated metrics dictionary + """ + lib_k8s = lib_telemetry.get_lib_kubernetes() + all_metrics = [] + + logging.info("Collecting results from HTTP load pods...") + + for pod_name in pod_names: + try: + # Read pod logs to get results + log_response = lib_k8s.get_pod_log(pod_name, namespace) + + # Handle HTTPResponse object from kubernetes client + if hasattr(log_response, 'data'): + logs = log_response.data.decode('utf-8') if isinstance(log_response.data, bytes) else str(log_response.data) + elif hasattr(log_response, 'read'): + logs = log_response.read().decode('utf-8') + else: + logs = str(log_response) + + # Parse JSON report from logs + metrics = self._parse_metrics_from_logs(logs) + + if metrics: + all_metrics.append(metrics) + logging.info(f"Collected metrics from pod: {pod_name}") + else: + logging.warning(f"No metrics found in logs for pod: {pod_name}") + + except Exception as e: + logging.warning(f"Failed to collect results from pod {pod_name}: {e}") + + if not all_metrics: + logging.warning("No metrics collected from any pods") + return {} + + # Aggregate metrics from all pods + aggregated = self._aggregate_metrics(all_metrics) + logging.info(f"Aggregated metrics from {len(all_metrics)} pod(s)") + + return aggregated + + def _parse_metrics_from_logs(self, logs: str) -> Dict[str, Any]: + """ + Parse Vegeta JSON metrics from pod logs. + + :param logs: Pod logs + :return: Metrics dictionary or None + """ + try: + # Look for JSON report section in logs + for line in logs.split('\n'): + line = line.strip() + if line.startswith('{') and '"latencies"' in line: + return json.loads(line) + return None + except Exception as e: + logging.warning(f"Failed to parse metrics from logs: {e}") + return None + + def _aggregate_metrics(self, metrics_list: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Aggregate metrics from multiple pods. + + :param metrics_list: List of metrics dictionaries from each pod + :return: Aggregated metrics + """ + if not metrics_list: + return {} + + # Sum totals + total_requests = sum(m.get("requests", 0) for m in metrics_list) + total_rate = sum(m.get("rate", 0) for m in metrics_list) + total_throughput = sum(m.get("throughput", 0) for m in metrics_list) + + # Average latencies (weighted by request count) + latencies = {} + if total_requests > 0: + for percentile in ["mean", "50th", "95th", "99th", "max", "min"]: + weighted_sum = sum( + m.get("latencies", {}).get(percentile, 0) * m.get("requests", 0) + for m in metrics_list + ) + latencies[percentile] = weighted_sum / total_requests if total_requests > 0 else 0 + + # Average success rate (weighted by request count) + total_success = sum( + m.get("success", 0) * m.get("requests", 0) + for m in metrics_list + ) + success_rate = total_success / total_requests if total_requests > 0 else 0 + + # Aggregate status codes + status_codes = {} + for metrics in metrics_list: + for code, count in metrics.get("status_codes", {}).items(): + status_codes[code] = status_codes.get(code, 0) + count + + # Aggregate bytes + bytes_in_total = sum(m.get("bytes_in", {}).get("total", 0) for m in metrics_list) + bytes_out_total = sum(m.get("bytes_out", {}).get("total", 0) for m in metrics_list) + + # Aggregate errors + all_errors = [] + for metrics in metrics_list: + all_errors.extend(metrics.get("errors", [])) + + return { + "requests": total_requests, + "rate": total_rate, + "throughput": total_throughput, + "latencies": latencies, + "success": success_rate, + "status_codes": status_codes, + "bytes_in": {"total": bytes_in_total}, + "bytes_out": {"total": bytes_out_total}, + "errors": all_errors[:10], # First 10 errors only + "pod_count": len(metrics_list) + } + + def _parse_duration_to_seconds(self, duration: str) -> int: + """ + Parse duration string to seconds. + + :param duration: Duration string like "30s", "5m", "1h" + :return: Duration in seconds + """ + import re + + match = re.match(r'^(\d+)(s|m|h)$', str(duration)) + if not match: + logging.warning(f"Invalid duration format: {duration}, defaulting to 30s") + return 30 + + value = int(match.group(1)) + unit = match.group(2) + + multipliers = { + "s": 1, + "m": 60, + "h": 3600, + } + + return value * multipliers.get(unit, 1) + + @staticmethod + def rollback_http_load_pods( + rollback_content: RollbackContent, + lib_telemetry: KrknTelemetryOpenshift + ): + """ + Rollback function to delete HTTP load pods. + + :param rollback_content: Rollback content containing namespace and pod names + :param lib_telemetry: Instance of KrknTelemetryOpenshift for Kubernetes operations + """ + try: + namespace = rollback_content.namespace + pod_names = json.loads( + base64.b64decode(rollback_content.resource_identifier.encode('utf-8')).decode('utf-8') + ) + + logging.info(f"Rolling back HTTP load pods: {pod_names} in namespace: {namespace}") + + for pod_name in pod_names: + try: + lib_telemetry.get_lib_kubernetes().delete_pod(pod_name, namespace) + logging.info(f"Deleted pod: {pod_name}") + except Exception as e: + logging.warning(f"Failed to delete pod {pod_name}: {e}") + + logging.info("Rollback of HTTP load pods completed") + except Exception as e: + logging.error(f"Failed to rollback HTTP load pods: {e}") + + def _log_metrics_summary(self, metrics: Dict[str, Any]): + """Log summary of test metrics.""" + logging.info("=" * 60) + logging.info("HTTP Load Test Results Summary (Aggregated)") + logging.info("=" * 60) + + # Pod count + pod_count = metrics.get("pod_count", 1) + logging.info(f"Load Generator Pods: {pod_count}") + + # Request statistics + requests = metrics.get("requests", 0) + logging.info(f"Total Requests: {requests}") + + # Success rate + success = metrics.get("success", 0.0) + logging.info(f"Success Rate: {success * 100:.2f}%") + + # Latency statistics + latencies = metrics.get("latencies", {}) + if latencies: + logging.info(f"Latency Mean: {latencies.get('mean', 0) / 1e6:.2f} ms") + logging.info(f"Latency P50: {latencies.get('50th', 0) / 1e6:.2f} ms") + logging.info(f"Latency P95: {latencies.get('95th', 0) / 1e6:.2f} ms") + logging.info(f"Latency P99: {latencies.get('99th', 0) / 1e6:.2f} ms") + logging.info(f"Latency Max: {latencies.get('max', 0) / 1e6:.2f} ms") + + # Throughput + throughput = metrics.get("throughput", 0.0) + logging.info(f"Total Throughput: {throughput:.2f} req/s") + + # Bytes + bytes_in = metrics.get("bytes_in", {}) + bytes_out = metrics.get("bytes_out", {}) + if bytes_in: + logging.info(f"Bytes In (total): {bytes_in.get('total', 0) / 1024 / 1024:.2f} MB") + if bytes_out: + logging.info(f"Bytes Out (total): {bytes_out.get('total', 0) / 1024 / 1024:.2f} MB") + + # Status codes + status_codes = metrics.get("status_codes", {}) + if status_codes: + logging.info("Status Code Distribution:") + for code, count in sorted(status_codes.items()): + logging.info(f" {code}: {count}") + + # Errors + errors = metrics.get("errors", []) + if errors: + logging.warning(f"Errors encountered: {len(errors)}") + for error in errors[:5]: # Show first 5 errors + logging.warning(f" - {error}") + + logging.info("=" * 60) diff --git a/scenarios/kube/http_load_scenario.yml b/scenarios/kube/http_load_scenario.yml new file mode 100644 index 00000000..6ea83312 --- /dev/null +++ b/scenarios/kube/http_load_scenario.yml @@ -0,0 +1,29 @@ +- http_load_scenario: + runs: 1 # number of times to execute the scenario + number-of-pods: 2 # number of attacker pods instantiated + namespace: default # namespace to deploy load testing pods + image: quay.io/krkn-chaos/krkn-http-load:latest # http load attacker container image + attacker-nodes: # node affinity to schedule the attacker pods, per each node label selector + node-role.kubernetes.io/worker: # can be specified multiple values so the kube scheduler will schedule the attacker pods + - "" # in the best way possible based on the provided labels. Multiple labels can be specified + # set empty value `attacker-nodes: {}` to let kubernetes schedule the pods + targets: # Vegeta round-robins across all endpoints + endpoints: # supported methods: GET, POST, PUT, DELETE, PATCH, HEAD + - url: "https://your-service.example.com/health" + method: "GET" + - url: "https://your-service.example.com/api/data" + method: "POST" + headers: + Content-Type: "application/json" + Authorization: "Bearer your-token" + body: '{"key":"value"}' + + rate: "50/1s" # request rate per pod: "50/1s", "1000/1m", "0" for max throughput + duration: "30s" # attack duration: "30s", "5m", "1h" + workers: 10 # initial concurrent workers per pod + max_workers: 100 # maximum workers per pod (auto-scales) + connections: 100 # max idle connections per host + timeout: "10s" # per-request timeout + keepalive: true # use persistent HTTP connections + http2: true # enable HTTP/2 + insecure: false # skip TLS verification (for self-signed certs) diff --git a/tests/test_http_load_scenario_plugin.py b/tests/test_http_load_scenario_plugin.py new file mode 100644 index 00000000..4933b69e --- /dev/null +++ b/tests/test_http_load_scenario_plugin.py @@ -0,0 +1,440 @@ +#!/usr/bin/env python3 + +""" +Test suite for HttpLoadScenarioPlugin class + +Usage: + python -m coverage run -a -m unittest tests/test_http_load_scenario_plugin.py -v +""" + +import base64 +import json +import tempfile +import unittest +import uuid +from pathlib import Path +from unittest.mock import MagicMock + +import yaml + +from krkn.rollback.config import RollbackContent +from krkn.scenario_plugins.http_load.http_load_scenario_plugin import HttpLoadScenarioPlugin + + +class TestHttpLoadScenarioPlugin(unittest.TestCase): + + def setUp(self): + self.plugin = HttpLoadScenarioPlugin() + + def test_get_scenario_types(self): + """Test get_scenario_types returns correct scenario type""" + result = self.plugin.get_scenario_types() + + self.assertEqual(result, ["http_load_scenarios"]) + self.assertEqual(len(result), 1) + + +class TestValidateConfig(unittest.TestCase): + + def setUp(self): + self.plugin = HttpLoadScenarioPlugin() + + def test_valid_config_single_endpoint(self): + config = { + "targets": { + "endpoints": [ + {"url": "https://example.com/api", "method": "GET"} + ] + }, + "rate": "50/1s", + "duration": "30s" + } + self.assertTrue(self.plugin._validate_config(config)) + + def test_valid_config_multiple_endpoints(self): + config = { + "targets": { + "endpoints": [ + {"url": "https://example.com/health", "method": "GET"}, + {"url": "https://example.com/api", "method": "POST", + "headers": {"Content-Type": "application/json"}, + "body": '{"key":"value"}'} + ] + } + } + self.assertTrue(self.plugin._validate_config(config)) + + def test_missing_targets(self): + config = {"rate": "50/1s", "duration": "30s"} + self.assertFalse(self.plugin._validate_config(config)) + + def test_missing_endpoints(self): + config = {"targets": {}} + self.assertFalse(self.plugin._validate_config(config)) + + def test_empty_endpoints_list(self): + config = {"targets": {"endpoints": []}} + self.assertFalse(self.plugin._validate_config(config)) + + def test_endpoint_missing_url(self): + config = { + "targets": { + "endpoints": [{"method": "GET"}] + } + } + self.assertFalse(self.plugin._validate_config(config)) + + def test_endpoint_missing_method(self): + config = { + "targets": { + "endpoints": [{"url": "https://example.com"}] + } + } + self.assertFalse(self.plugin._validate_config(config)) + + def test_invalid_endpoint_not_dict(self): + config = { + "targets": { + "endpoints": ["https://example.com"] + } + } + self.assertFalse(self.plugin._validate_config(config)) + + +class TestBuildVegetaJsonTargets(unittest.TestCase): + + def setUp(self): + self.plugin = HttpLoadScenarioPlugin() + + def test_single_get_endpoint(self): + endpoints = [{"url": "https://example.com/api", "method": "GET"}] + result = self.plugin._build_vegeta_json_targets(endpoints) + + parsed = json.loads(result) + self.assertEqual(parsed["method"], "GET") + self.assertEqual(parsed["url"], "https://example.com/api") + + def test_endpoint_with_headers(self): + endpoints = [{ + "url": "https://example.com/api", + "method": "GET", + "headers": {"Authorization": "Bearer token123", "X-Custom": "value"} + }] + result = self.plugin._build_vegeta_json_targets(endpoints) + + parsed = json.loads(result) + self.assertIn("header", parsed) + self.assertEqual(parsed["header"]["Authorization"], ["Bearer token123"]) + self.assertEqual(parsed["header"]["X-Custom"], ["value"]) + + def test_endpoint_with_body(self): + endpoints = [{ + "url": "https://example.com/api", + "method": "POST", + "headers": {"Content-Type": "application/json"}, + "body": '{"key":"value"}' + }] + result = self.plugin._build_vegeta_json_targets(endpoints) + + parsed = json.loads(result) + self.assertIn("body", parsed) + decoded_body = base64.b64decode(parsed["body"]).decode() + self.assertEqual(decoded_body, '{"key":"value"}') + + def test_multiple_endpoints_newline_delimited(self): + endpoints = [ + {"url": "https://example.com/health", "method": "GET"}, + {"url": "https://example.com/api", "method": "POST"} + ] + result = self.plugin._build_vegeta_json_targets(endpoints) + + lines = result.strip().split("\n") + self.assertEqual(len(lines), 2) + + target1 = json.loads(lines[0]) + target2 = json.loads(lines[1]) + self.assertEqual(target1["method"], "GET") + self.assertEqual(target2["method"], "POST") + + +class TestParseDurationToSeconds(unittest.TestCase): + + def setUp(self): + self.plugin = HttpLoadScenarioPlugin() + + def test_seconds(self): + self.assertEqual(self.plugin._parse_duration_to_seconds("30s"), 30) + + def test_minutes(self): + self.assertEqual(self.plugin._parse_duration_to_seconds("5m"), 300) + + def test_hours(self): + self.assertEqual(self.plugin._parse_duration_to_seconds("1h"), 3600) + + def test_invalid_format_defaults_to_30(self): + self.assertEqual(self.plugin._parse_duration_to_seconds("invalid"), 30) + + def test_integer_input(self): + self.assertEqual(self.plugin._parse_duration_to_seconds(30), 30) + + +class TestAggregateMetrics(unittest.TestCase): + + def setUp(self): + self.plugin = HttpLoadScenarioPlugin() + + def test_single_pod_metrics(self): + metrics_list = [{ + "requests": 1000, + "rate": 50.0, + "throughput": 49.5, + "success": 0.99, + "latencies": {"mean": 50000000, "50th": 45000000, "95th": 80000000, + "99th": 100000000, "max": 150000000, "min": 1000000}, + "status_codes": {"200": 990, "500": 10}, + "bytes_in": {"total": 1024000}, + "bytes_out": {"total": 512000}, + "errors": ["connection refused"] + }] + result = self.plugin._aggregate_metrics(metrics_list) + + self.assertEqual(result["requests"], 1000) + self.assertEqual(result["pod_count"], 1) + self.assertAlmostEqual(result["success"], 0.99) + + def test_multiple_pod_metrics(self): + metrics_list = [ + {"requests": 500, "rate": 25.0, "throughput": 24.5, + "success": 1.0, "latencies": {"mean": 40000000}, + "status_codes": {"200": 500}, "bytes_in": {"total": 512000}, + "bytes_out": {"total": 256000}, "errors": []}, + {"requests": 500, "rate": 25.0, "throughput": 24.5, + "success": 0.98, "latencies": {"mean": 60000000}, + "status_codes": {"200": 490, "500": 10}, "bytes_in": {"total": 512000}, + "bytes_out": {"total": 256000}, "errors": ["timeout"]} + ] + result = self.plugin._aggregate_metrics(metrics_list) + + self.assertEqual(result["requests"], 1000) + self.assertEqual(result["rate"], 50.0) + self.assertEqual(result["pod_count"], 2) + self.assertEqual(result["status_codes"]["200"], 990) + self.assertEqual(result["status_codes"]["500"], 10) + + def test_empty_metrics_list(self): + self.assertEqual(self.plugin._aggregate_metrics([]), {}) + + +class TestParseMetricsFromLogs(unittest.TestCase): + + def setUp(self): + self.plugin = HttpLoadScenarioPlugin() + + def test_valid_json_report(self): + logs = ( + "=== Krkn HTTP Load Scenario ===\n" + "RATE: 50/1s\n" + "=== JSON Report ===\n" + '{"requests":1000,"latencies":{"mean":50000000},"success":0.99}\n' + "Attack completed successfully\n" + ) + result = self.plugin._parse_metrics_from_logs(logs) + + self.assertIsNotNone(result) + self.assertEqual(result["requests"], 1000) + + def test_no_json_in_logs(self): + logs = "=== Krkn HTTP Load Scenario ===\nno json here\n" + result = self.plugin._parse_metrics_from_logs(logs) + self.assertIsNone(result) + + +class TestHttpLoadRun(unittest.TestCase): + + def _create_scenario_file(self, tmp_dir, config=None): + default_config = [{ + "http_load_scenario": { + "targets": { + "endpoints": [ + {"url": "https://example.com/api", "method": "GET"} + ] + }, + "rate": "50/1s", + "duration": "10s", + "namespace": "default", + "number-of-pods": 1, + "image": "quay.io/krkn-chaos/krkn-http-load:latest" + } + }] + if config: + default_config[0]["http_load_scenario"].update(config) + + scenario_file = Path(tmp_dir) / "test_scenario.yaml" + with open(scenario_file, "w") as f: + yaml.dump(default_config, f) + return str(scenario_file) + + def _create_mocks(self): + mock_lib_telemetry = MagicMock() + mock_lib_kubernetes = MagicMock() + mock_lib_telemetry.get_lib_kubernetes.return_value = mock_lib_kubernetes + mock_scenario_telemetry = MagicMock() + return mock_lib_telemetry, mock_lib_kubernetes, mock_scenario_telemetry + + def test_run_successful(self): + with tempfile.TemporaryDirectory() as tmp_dir: + scenario_file = self._create_scenario_file(tmp_dir) + mock_lib_telemetry, mock_lib_kubernetes, mock_scenario_telemetry = ( + self._create_mocks() + ) + + mock_lib_kubernetes.is_pod_running.return_value = False + mock_lib_kubernetes.get_pod_log.return_value = ( + '{"requests":100,"latencies":{"mean":50000000},"success":1.0,' + '"rate":50.0,"throughput":49.5,"status_codes":{"200":100},' + '"bytes_in":{"total":1024},"bytes_out":{"total":512},"errors":[]}' + ) + + plugin = HttpLoadScenarioPlugin() + result = plugin.run( + run_uuid=str(uuid.uuid4()), + scenario=scenario_file, + lib_telemetry=mock_lib_telemetry, + scenario_telemetry=mock_scenario_telemetry, + ) + + self.assertEqual(result, 0) + mock_lib_kubernetes.deploy_http_load.assert_called_once() + + def test_run_multiple_pods(self): + with tempfile.TemporaryDirectory() as tmp_dir: + scenario_file = self._create_scenario_file(tmp_dir, {"number-of-pods": 3}) + mock_lib_telemetry, mock_lib_kubernetes, mock_scenario_telemetry = ( + self._create_mocks() + ) + + mock_lib_kubernetes.is_pod_running.return_value = False + mock_lib_kubernetes.get_pod_log.return_value = ( + '{"requests":100,"latencies":{"mean":50000000},"success":1.0,' + '"rate":50.0,"throughput":49.5,"status_codes":{"200":100},' + '"bytes_in":{"total":1024},"bytes_out":{"total":512},"errors":[]}' + ) + + plugin = HttpLoadScenarioPlugin() + result = plugin.run( + run_uuid=str(uuid.uuid4()), + scenario=scenario_file, + lib_telemetry=mock_lib_telemetry, + scenario_telemetry=mock_scenario_telemetry, + ) + + self.assertEqual(result, 0) + self.assertEqual(mock_lib_kubernetes.deploy_http_load.call_count, 3) + + def test_run_invalid_config(self): + with tempfile.TemporaryDirectory() as tmp_dir: + scenario_file = Path(tmp_dir) / "bad_scenario.yaml" + with open(scenario_file, "w") as f: + yaml.dump([{"http_load_scenario": {"invalid": "config"}}], f) + + mock_lib_telemetry, mock_lib_kubernetes, mock_scenario_telemetry = ( + self._create_mocks() + ) + + plugin = HttpLoadScenarioPlugin() + result = plugin.run( + run_uuid=str(uuid.uuid4()), + scenario=str(scenario_file), + lib_telemetry=mock_lib_telemetry, + scenario_telemetry=mock_scenario_telemetry, + ) + + self.assertEqual(result, 1) + mock_lib_kubernetes.deploy_http_load.assert_not_called() + + def test_run_deploy_exception(self): + with tempfile.TemporaryDirectory() as tmp_dir: + scenario_file = self._create_scenario_file(tmp_dir) + mock_lib_telemetry, mock_lib_kubernetes, mock_scenario_telemetry = ( + self._create_mocks() + ) + + mock_lib_kubernetes.deploy_http_load.side_effect = Exception("Deploy failed") + + plugin = HttpLoadScenarioPlugin() + result = plugin.run( + run_uuid=str(uuid.uuid4()), + scenario=scenario_file, + lib_telemetry=mock_lib_telemetry, + scenario_telemetry=mock_scenario_telemetry, + ) + + self.assertEqual(result, 1) + + +class TestRollbackHttpLoadPods(unittest.TestCase): + + def test_rollback_successful(self): + pod_names = ["http-load-abc123", "http-load-def456"] + encoded_data = base64.b64encode( + json.dumps(pod_names).encode("utf-8") + ).decode("utf-8") + + rollback_content = RollbackContent( + resource_identifier=encoded_data, + namespace="default", + ) + + mock_lib_telemetry = MagicMock() + mock_lib_kubernetes = MagicMock() + mock_lib_telemetry.get_lib_kubernetes.return_value = mock_lib_kubernetes + + HttpLoadScenarioPlugin.rollback_http_load_pods( + rollback_content, mock_lib_telemetry + ) + + self.assertEqual(mock_lib_kubernetes.delete_pod.call_count, 2) + mock_lib_kubernetes.delete_pod.assert_any_call("http-load-abc123", "default") + mock_lib_kubernetes.delete_pod.assert_any_call("http-load-def456", "default") + + def test_rollback_empty_list(self): + encoded_data = base64.b64encode( + json.dumps([]).encode("utf-8") + ).decode("utf-8") + + rollback_content = RollbackContent( + resource_identifier=encoded_data, + namespace="default", + ) + + mock_lib_telemetry = MagicMock() + mock_lib_kubernetes = MagicMock() + mock_lib_telemetry.get_lib_kubernetes.return_value = mock_lib_kubernetes + + HttpLoadScenarioPlugin.rollback_http_load_pods( + rollback_content, mock_lib_telemetry + ) + + mock_lib_kubernetes.delete_pod.assert_not_called() + + def test_rollback_invalid_data(self): + rollback_content = RollbackContent( + resource_identifier="invalid_base64_data", + namespace="default", + ) + + mock_lib_telemetry = MagicMock() + mock_lib_kubernetes = MagicMock() + mock_lib_telemetry.get_lib_kubernetes.return_value = mock_lib_kubernetes + + with self.assertLogs(level='ERROR') as log_context: + HttpLoadScenarioPlugin.rollback_http_load_pods( + rollback_content, mock_lib_telemetry + ) + + self.assertTrue(any('error' in log.lower() for log in log_context.output)) + mock_lib_kubernetes.delete_pod.assert_not_called() + + +if __name__ == "__main__": + unittest.main()