collect ERROR and CRITICAL logs and send to elastic search (#1147) (#1150)

* collect ERROR and CRITICAL logs and send to elastic search

Signed-off-by: ddjain <darjain@redhat.com>

* bump up krkn-lib to 6.0.3

Signed-off-by: ddjain <darjain@redhat.com>

---------

Signed-off-by: ddjain <darjain@redhat.com>
This commit is contained in:
Darshan Jain
2026-02-18 18:26:14 +05:30
committed by GitHub
parent b6ef7fa052
commit 2065443622
5 changed files with 111 additions and 7 deletions

View File

@@ -0,0 +1,71 @@
import logging
import threading
from datetime import datetime, timezone
from krkn.utils.ErrorLog import ErrorLog
class ErrorCollectionHandler(logging.Handler):
"""
Custom logging handler that captures ERROR and CRITICAL level logs
in structured format for telemetry collection.
Stores logs in memory as ErrorLog objects for later retrieval.
Thread-safe for concurrent logging operations.
"""
def __init__(self, level=logging.ERROR):
"""
Initialize the error collection handler.
Args:
level: Minimum log level to capture (default: ERROR)
"""
super().__init__(level)
self.error_logs: list[ErrorLog] = []
self._lock = threading.Lock()
def emit(self, record: logging.LogRecord):
"""
Capture ERROR and CRITICAL logs and store as ErrorLog objects.
Args:
record: LogRecord from Python logging framework
"""
try:
# Only capture ERROR (40) and CRITICAL (50) levels
if record.levelno < logging.ERROR:
return
# Format timestamp as ISO 8601 UTC
timestamp = datetime.fromtimestamp(
record.created, tz=timezone.utc
).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
# Create ErrorLog object
error_log = ErrorLog(
timestamp=timestamp,
message=record.getMessage()
)
# Thread-safe append
with self._lock:
self.error_logs.append(error_log)
except Exception:
# Handler should never raise exceptions (logging best practice)
self.handleError(record)
def get_error_logs(self) -> list[dict]:
"""
Retrieve all collected error logs as list of dictionaries.
Returns:
List of error log dictionaries with timestamp and message
"""
with self._lock:
return [log.to_dict() for log in self.error_logs]
def clear(self):
"""Clear all collected error logs (useful for testing)"""
with self._lock:
self.error_logs.clear()

18
krkn/utils/ErrorLog.py Normal file
View File

@@ -0,0 +1,18 @@
from dataclasses import dataclass, asdict
@dataclass
class ErrorLog:
"""
Represents a single error log entry for telemetry collection.
Attributes:
timestamp: ISO 8601 formatted timestamp (UTC)
message: Full error message text
"""
timestamp: str
message: str
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization"""
return asdict(self)

View File

@@ -1,2 +1,4 @@
from .TeeLogHandler import TeeLogHandler
from .ErrorLog import ErrorLog
from .ErrorCollectionHandler import ErrorCollectionHandler
from .functions import *

View File

@@ -17,7 +17,7 @@ ibm_vpc==0.26.3 # Requires ibm_cloud_sdk_core
jinja2==3.1.6
lxml==5.1.0
kubernetes==34.1.0
krkn-lib==6.0.2
krkn-lib==6.0.3
numpy==1.26.4
pandas==2.2.0
openshift-client==1.0.21

View File

@@ -28,7 +28,7 @@ from krkn_lib.models.telemetry import ChaosRunTelemetry
from krkn_lib.utils import SafeLogger
from krkn_lib.utils.functions import get_yaml_item_value, get_junit_test_case
from krkn.utils import TeeLogHandler
from krkn.utils import TeeLogHandler, ErrorCollectionHandler
from krkn.utils.HealthChecker import HealthChecker
from krkn.utils.VirtChecker import VirtChecker
from krkn.scenario_plugins.scenario_plugin_factory import (
@@ -426,16 +426,22 @@ def main(options, command: Optional[str]) -> int:
logging.info("collecting Kubernetes cluster metadata....")
telemetry_k8s.collect_cluster_metadata(chaos_telemetry)
# Collect error logs from handler
error_logs = error_collection_handler.get_error_logs()
if error_logs:
logging.info(f"Collected {len(error_logs)} error logs for telemetry")
chaos_telemetry.error_logs = error_logs
else:
logging.info("No error logs collected during chaos run")
chaos_telemetry.error_logs = []
telemetry_json = chaos_telemetry.to_json()
decoded_chaos_run_telemetry = ChaosRunTelemetry(json.loads(telemetry_json))
chaos_output.telemetry = decoded_chaos_run_telemetry
logging.info(f"Chaos data:\n{chaos_output.to_json()}")
if enable_elastic:
elastic_telemetry = ElasticChaosRunTelemetry(
chaos_run_telemetry=decoded_chaos_run_telemetry
)
result = elastic_search.push_telemetry(
elastic_telemetry, elastic_telemetry_index
decoded_chaos_run_telemetry, elastic_telemetry_index
)
if result == -1:
safe_logger.error(
@@ -660,7 +666,14 @@ if __name__ == "__main__":
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(colored)
tee_handler.setFormatter(plain)
handlers = [file_handler, stream_handler, tee_handler]
error_collection_handler = ErrorCollectionHandler(level=logging.ERROR)
handlers = [
file_handler,
stream_handler,
tee_handler,
error_collection_handler,
]
logging.basicConfig(
level=logging.DEBUG if options.debug else logging.INFO,