collect ERROR and CRITICAL logs and send to elastic search

Signed-off-by: ddjain <darjain@redhat.com>
This commit is contained in:
ddjain
2026-02-03 14:48:45 +05:30
parent a9f1ce8f1b
commit e65cebeee4
4 changed files with 105 additions and 5 deletions

View File

@@ -0,0 +1,71 @@
import logging
import threading
from datetime import datetime, timezone
from krkn.utils.ErrorLog import ErrorLog
class ErrorCollectionHandler(logging.Handler):
"""
Custom logging handler that captures ERROR and CRITICAL level logs
in structured format for telemetry collection.
Stores logs in memory as ErrorLog objects for later retrieval.
Thread-safe for concurrent logging operations.
"""
def __init__(self, level=logging.ERROR):
"""
Initialize the error collection handler.
Args:
level: Minimum log level to capture (default: ERROR)
"""
super().__init__(level)
self.error_logs: list[ErrorLog] = []
self._lock = threading.Lock()
def emit(self, record: logging.LogRecord):
"""
Capture ERROR and CRITICAL logs and store as ErrorLog objects.
Args:
record: LogRecord from Python logging framework
"""
try:
# Only capture ERROR (40) and CRITICAL (50) levels
if record.levelno < logging.ERROR:
return
# Format timestamp as ISO 8601 UTC
timestamp = datetime.fromtimestamp(
record.created, tz=timezone.utc
).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
# Create ErrorLog object
error_log = ErrorLog(
timestamp=timestamp,
message=record.getMessage()
)
# Thread-safe append
with self._lock:
self.error_logs.append(error_log)
except Exception:
# Handler should never raise exceptions (logging best practice)
self.handleError(record)
def get_error_logs(self) -> list[dict]:
"""
Retrieve all collected error logs as list of dictionaries.
Returns:
List of error log dictionaries with timestamp and message
"""
with self._lock:
return [log.to_dict() for log in self.error_logs]
def clear(self):
"""Clear all collected error logs (useful for testing)"""
with self._lock:
self.error_logs.clear()

18
krkn/utils/ErrorLog.py Normal file
View File

@@ -0,0 +1,18 @@
from dataclasses import dataclass, asdict
@dataclass
class ErrorLog:
"""
Represents a single error log entry for telemetry collection.
Attributes:
timestamp: ISO 8601 formatted timestamp (UTC)
message: Full error message text
"""
timestamp: str
message: str
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization"""
return asdict(self)

View File

@@ -1,2 +1,4 @@
from .TeeLogHandler import TeeLogHandler
from .ErrorLog import ErrorLog
from .ErrorCollectionHandler import ErrorCollectionHandler
from .functions import *

View File

@@ -27,7 +27,7 @@ from krkn_lib.models.telemetry import ChaosRunTelemetry
from krkn_lib.utils import SafeLogger
from krkn_lib.utils.functions import get_yaml_item_value, get_junit_test_case
from krkn.utils import TeeLogHandler
from krkn.utils import TeeLogHandler, ErrorCollectionHandler
from krkn.utils.HealthChecker import HealthChecker
from krkn.utils.VirtChecker import VirtChecker
from krkn.scenario_plugins.scenario_plugin_factory import (
@@ -425,16 +425,22 @@ def main(options, command: Optional[str]) -> int:
logging.info("collecting Kubernetes cluster metadata....")
telemetry_k8s.collect_cluster_metadata(chaos_telemetry)
# Collect error logs from handler
error_logs = error_collection_handler.get_error_logs()
if error_logs:
logging.info(f"Collected {len(error_logs)} error logs for telemetry")
chaos_telemetry.error_logs = error_logs
else:
logging.info("No error logs collected during chaos run")
chaos_telemetry.error_logs = []
telemetry_json = chaos_telemetry.to_json()
decoded_chaos_run_telemetry = ChaosRunTelemetry(json.loads(telemetry_json))
chaos_output.telemetry = decoded_chaos_run_telemetry
logging.info(f"Chaos data:\n{chaos_output.to_json()}")
if enable_elastic:
elastic_telemetry = ElasticChaosRunTelemetry(
chaos_run_telemetry=decoded_chaos_run_telemetry
)
result = elastic_search.push_telemetry(
elastic_telemetry, elastic_telemetry_index
decoded_chaos_run_telemetry, elastic_telemetry_index
)
if result == -1:
safe_logger.error(
@@ -646,10 +652,13 @@ if __name__ == "__main__":
# If no command or regular execution, continue with existing logic
report_file = options.output
tee_handler = TeeLogHandler()
error_collection_handler = ErrorCollectionHandler(level=logging.ERROR)
handlers = [
logging.FileHandler(report_file, mode="w"),
logging.StreamHandler(),
tee_handler,
error_collection_handler,
]
logging.basicConfig(