[Rollback Scenario] Refactor execution (#895)
Some checks failed
Functional & Unit Tests / Functional & Unit Tests (push) Failing after 9m28s
Functional & Unit Tests / Generate Coverage Badge (push) Has been skipped

* Validate version file format

* Add validation for context dir, Exexcute all files by default

* Consolidate execute and cleanup, rename with .executed instead of
removing

* Respect auto_rollback config

* Add cleanup back but only for scenario successed

---------

Co-authored-by: Tullio Sebastiani <tsebastiani@users.noreply.github.com>
This commit is contained in:
LIU ZHE YOU
2025-11-19 21:14:06 +08:00
committed by GitHub
parent f4bdbff9dc
commit 959337eb63
5 changed files with 312 additions and 71 deletions

View File

@@ -3,7 +3,7 @@ import logging
from typing import Optional, TYPE_CHECKING
from krkn.rollback.config import RollbackConfig
from krkn.rollback.handler import execute_rollback_version_files, cleanup_rollback_version_files
from krkn.rollback.handler import execute_rollback_version_files
@@ -96,24 +96,16 @@ def execute_rollback(telemetry_ocp: "KrknTelemetryOpenshift", run_uuid: Optional
:return: Exit code (0 for success, 1 for error)
"""
logging.info("Executing rollback version files")
if not run_uuid:
logging.error("run_uuid is required for execute-rollback command")
return 1
if not scenario_type:
logging.warning("scenario_type is not specified, executing all scenarios in rollback directory")
logging.info(f"Executing rollback for run_uuid={run_uuid or '*'}, scenario_type={scenario_type or '*'}")
try:
# Execute rollback version files
logging.info(f"Executing rollback for run_uuid={run_uuid}, scenario_type={scenario_type or '*'}")
execute_rollback_version_files(telemetry_ocp, run_uuid, scenario_type)
# If execution was successful, cleanup the version files
logging.info("Rollback execution completed successfully, cleaning up version files")
cleanup_rollback_version_files(run_uuid, scenario_type)
logging.info("Rollback execution and cleanup completed successfully")
execute_rollback_version_files(
telemetry_ocp,
run_uuid,
scenario_type,
ignore_auto_rollback_config=True
)
return 0
except Exception as e:

View File

@@ -108,7 +108,76 @@ class RollbackConfig(metaclass=SingletonMeta):
return f"{cls().versions_directory}/{rollback_context}"
@classmethod
def search_rollback_version_files(cls, run_uuid: str, scenario_type: str | None = None) -> list[str]:
def is_rollback_version_file_format(cls, file_name: str, expected_scenario_type: str | None = None) -> bool:
"""
Validate the format of a rollback version file name.
Expected format: <scenario_type>_<timestamp>_<hash_suffix>.py
where:
- scenario_type: string (can include underscores)
- timestamp: integer (nanoseconds since epoch)
- hash_suffix: alphanumeric string (length 8)
- .py: file extension
:param file_name: The name of the file to validate.
:param expected_scenario_type: The expected scenario type (if any) to validate against.
:return: True if the file name matches the expected format, False otherwise.
"""
if not file_name.endswith(".py"):
return False
parts = file_name.split("_")
if len(parts) < 3:
return False
scenario_type = "_".join(parts[:-2])
timestamp_str = parts[-2]
hash_suffix_with_ext = parts[-1]
hash_suffix = hash_suffix_with_ext[:-3]
if expected_scenario_type and scenario_type != expected_scenario_type:
return False
if not timestamp_str.isdigit():
return False
if len(hash_suffix) != 8 or not hash_suffix.isalnum():
return False
return True
@classmethod
def is_rollback_context_directory_format(cls, directory_name: str, expected_run_uuid: str | None = None) -> bool:
"""
Validate the format of a rollback context directory name.
Expected format: <timestamp>-<run_uuid>
where:
- timestamp: integer (nanoseconds since epoch)
- run_uuid: alphanumeric string
:param directory_name: The name of the directory to validate.
:param expected_run_uuid: The expected run UUID (if any) to validate against.
:return: True if the directory name matches the expected format, False otherwise.
"""
parts = directory_name.split("-", 1)
if len(parts) != 2:
return False
timestamp_str, run_uuid = parts
# Validate timestamp is numeric
if not timestamp_str.isdigit():
return False
# Validate run_uuid
if expected_run_uuid and expected_run_uuid != run_uuid:
return False
return True
@classmethod
def search_rollback_version_files(cls, run_uuid: str | None = None, scenario_type: str | None = None) -> list[str]:
"""
Search for rollback version files based on run_uuid and scenario_type.
@@ -123,34 +192,31 @@ class RollbackConfig(metaclass=SingletonMeta):
if not os.path.exists(cls().versions_directory):
return []
rollback_context_directories = [
dirname for dirname in os.listdir(cls().versions_directory) if run_uuid in dirname
]
rollback_context_directories = []
for dir in os.listdir(cls().versions_directory):
if cls.is_rollback_context_directory_format(dir, run_uuid):
rollback_context_directories.append(dir)
else:
logger.warning(f"Directory {dir} does not match expected pattern of <timestamp>-<run_uuid>")
if not rollback_context_directories:
logger.warning(f"No rollback context directories found for run UUID {run_uuid}")
return []
if len(rollback_context_directories) > 1:
logger.warning(
f"Expected one directory for run UUID {run_uuid}, found: {rollback_context_directories}"
)
rollback_context_directory = rollback_context_directories[0]
version_files = []
scenario_rollback_versions_directory = os.path.join(
cls().versions_directory, rollback_context_directory
)
for file in os.listdir(scenario_rollback_versions_directory):
# assert all files start with scenario_type and end with .py
if file.endswith(".py") and (scenario_type is None or file.startswith(scenario_type)):
version_files.append(
os.path.join(scenario_rollback_versions_directory, file)
)
else:
logger.warning(
f"File {file} does not match expected pattern for scenario type {scenario_type}"
)
for rollback_context_dir in rollback_context_directories:
rollback_context_dir = os.path.join(cls().versions_directory, rollback_context_dir)
for file in os.listdir(rollback_context_dir):
if cls.is_rollback_version_file_format(file, scenario_type):
version_files.append(
os.path.join(rollback_context_dir, file)
)
else:
logger.warning(
f"File {file} does not match expected pattern of <{scenario_type or '*'}>_<timestamp>_<hash_suffix>.py"
)
return version_files
@dataclass(frozen=True)

View File

@@ -117,23 +117,33 @@ def _parse_rollback_module(version_file_path: str) -> tuple[RollbackCallable, Ro
return rollback_callable, rollback_content
def execute_rollback_version_files(telemetry_ocp: "KrknTelemetryOpenshift", run_uuid: str, scenario_type: str | None = None):
def execute_rollback_version_files(
telemetry_ocp: "KrknTelemetryOpenshift",
run_uuid: str | None = None,
scenario_type: str | None = None,
ignore_auto_rollback_config: bool = False
):
"""
Execute rollback version files for the given run_uuid and scenario_type.
This function is called when a signal is received to perform rollback operations.
:param run_uuid: Unique identifier for the run.
:param scenario_type: Type of the scenario being rolled back.
:param ignore_auto_rollback_config: Flag to ignore auto rollback configuration. Will be set to True for manual execute-rollback calls.
"""
if not ignore_auto_rollback_config and RollbackConfig.auto is False:
logger.warning(f"Auto rollback is disabled, skipping execution for run_uuid={run_uuid or '*'}, scenario_type={scenario_type or '*'}")
return
# Get the rollback versions directory
version_files = RollbackConfig.search_rollback_version_files(run_uuid, scenario_type)
if not version_files:
logger.warning(f"Skip execution for run_uuid={run_uuid}, scenario_type={scenario_type or '*'}")
logger.warning(f"Skip execution for run_uuid={run_uuid or '*'}, scenario_type={scenario_type or '*'}")
return
# Execute all version files in the directory
logger.info(f"Executing rollback version files for run_uuid={run_uuid}, scenario_type={scenario_type or '*'}")
logger.info(f"Executing rollback version files for run_uuid={run_uuid or '*'}, scenario_type={scenario_type or '*'}")
for version_file in version_files:
try:
logger.info(f"Executing rollback version file: {version_file}")
@@ -144,28 +154,37 @@ def execute_rollback_version_files(telemetry_ocp: "KrknTelemetryOpenshift", run_
logger.info('Executing rollback callable...')
rollback_callable(rollback_content, telemetry_ocp)
logger.info('Rollback completed.')
logger.info(f"Executed {version_file} successfully.")
success = True
except Exception as e:
success = False
logger.error(f"Failed to execute rollback version file {version_file}: {e}")
raise
# Rename the version file with .executed suffix if successful
if success:
try:
executed_file = f"{version_file}.executed"
os.rename(version_file, executed_file)
logger.info(f"Renamed {version_file} to {executed_file} successfully.")
except Exception as e:
logger.error(f"Failed to rename rollback version file {version_file}: {e}")
raise
def cleanup_rollback_version_files(run_uuid: str, scenario_type: str):
"""
Cleanup rollback version files for the given run_uuid and scenario_type.
This function is called to remove the rollback version files after execution.
This function is called to remove the rollback version files after successful scenario execution in run_scenarios.
:param run_uuid: Unique identifier for the run.
:param scenario_type: Type of the scenario being rolled back.
"""
# Get the rollback versions directory
version_files = RollbackConfig.search_rollback_version_files(run_uuid, scenario_type)
if not version_files:
logger.warning(f"Skip cleanup for run_uuid={run_uuid}, scenario_type={scenario_type or '*'}")
return
# Remove all version files in the directory
logger.info(f"Cleaning up rollback version files for run_uuid={run_uuid}, scenario_type={scenario_type}")
for version_file in version_files:
@@ -176,7 +195,6 @@ def cleanup_rollback_version_files(run_uuid: str, scenario_type: str):
logger.error(f"Failed to remove rollback version file {version_file}: {e}")
raise
class RollbackHandler:
def __init__(
self,

View File

@@ -115,14 +115,15 @@ class AbstractScenarioPlugin(ABC):
)
return_value = 1
# execute rollback files based on the return value
if return_value != 0:
if return_value == 0:
cleanup_rollback_version_files(
run_uuid, scenario_telemetry.scenario_type
)
else:
# execute rollback files based on the return value
execute_rollback_version_files(
telemetry, run_uuid, scenario_telemetry.scenario_type
)
cleanup_rollback_version_files(
run_uuid, scenario_telemetry.scenario_type
)
scenario_telemetry.exit_status = return_value
scenario_telemetry.end_timestamp = time.time()
utils.collect_and_put_ocp_logs(

View File

@@ -58,25 +58,23 @@ class TestRollbackScenarioPlugin:
for vf in version_files
]
def execute_version_file(self, version_file: str):
def execute_version_file(self, version_file: str, telemetry_ocp: KrknTelemetryOpenshift):
"""
Execute a rollback version file using subprocess.
Execute a rollback version file using the new importlib approach.
:param version_file: The path to the version file to execute.
"""
print(f"Executing rollback version file: {version_file}")
result = subprocess.run(
[sys.executable, version_file],
capture_output=True,
text=True,
)
assert result.returncode == 0, (
f"Rollback version file {version_file} failed with return code {result.returncode}. "
f"Output: {result.stdout}, Error: {result.stderr}"
)
print(
f"Rollback version file executed successfully: {version_file} with output: {result.stdout}"
)
try:
from krkn.rollback.handler import _parse_rollback_module
rollback_callable, rollback_content = _parse_rollback_module(version_file)
rollback_callable(rollback_content, telemetry_ocp)
print(f"Rollback version file executed successfully: {version_file}")
except Exception as e:
raise AssertionError(
f"Rollback version file {version_file} failed with error: {e}"
)
@pytest.fixture(autouse=True)
def setup_logging(self):
@@ -130,7 +128,11 @@ class TestRollbackScenarioPlugin:
)
@pytest.mark.usefixtures("setup_rollback_config")
def test_simple_rollback_scenario_plugin(self, lib_telemetry, scenario_telemetry):
def test_simple_rollback_scenario_plugin(
self,
lib_telemetry: KrknTelemetryOpenshift,
scenario_telemetry: ScenarioTelemetry,
):
from tests.rollback_scenario_plugins.simple import SimpleRollbackScenarioPlugin
scenario_type = "simple_rollback_scenario"
@@ -157,4 +159,166 @@ class TestRollbackScenarioPlugin:
)
# Execute the rollback version file
for version_file in version_files:
self.execute_version_file(version_file)
self.execute_version_file(version_file, lib_telemetry)
class TestRollbackConfig:
@pytest.mark.parametrize("directory_name,run_uuid,expected", [
("123456789-abcdefgh", "abcdefgh", True),
("123456789-abcdefgh", None, True),
("123456789-abcdefgh", "ijklmnop", False),
("123456789-", "abcdefgh", False),
("-abcdefgh", "abcdefgh", False),
("123456789-abcdefgh-ijklmnop", "abcdefgh", False),
])
def test_is_rollback_context_directory_format(self, directory_name, run_uuid, expected):
assert RollbackConfig.is_rollback_context_directory_format(directory_name, run_uuid) == expected
@pytest.mark.parametrize("file_name,expected", [
("simple_rollback_scenario_123456789_abcdefgh.py", True),
("simple_rollback_scenario_123456789_abcdefgh.py.executed", False),
("simple_rollback_scenario_123456789_abc.py", False),
("simple_rollback_scenario_123456789_abcdefgh.txt", False),
("simple_rollback_scenario_123456789_.py", False),
])
def test_is_rollback_version_file_format(self, file_name, expected):
assert RollbackConfig.is_rollback_version_file_format(file_name) == expected
class TestRollbackCommand:
@pytest.mark.parametrize("auto_rollback", [True, False], ids=["enabled_rollback", "disabled_rollback"])
@pytest.mark.parametrize("encounter_exception", [True, False], ids=["no_exception", "with_exception"])
def test_execute_rollback_command_ignore_auto_rollback_config(self, auto_rollback, encounter_exception):
"""Test execute_rollback function with different auto rollback configurations."""
from krkn.rollback.command import execute_rollback
from krkn.rollback.config import RollbackConfig
from unittest.mock import Mock, patch
# Create mock telemetry
mock_telemetry = Mock()
# Mock search_rollback_version_files to return some test files
mock_version_files = [
"/tmp/test_versions/123456789-test-uuid/scenario_123456789_abcdefgh.py",
"/tmp/test_versions/123456789-test-uuid/scenario_123456789_ijklmnop.py"
]
with (
patch.object(RollbackConfig, 'auto', auto_rollback) as _,
patch.object(RollbackConfig, 'search_rollback_version_files', return_value=mock_version_files) as mock_search,
patch('krkn.rollback.command.execute_rollback_version_files') as mock_execute
):
if encounter_exception:
mock_execute.side_effect = Exception("Test exception")
# Call the function
result = execute_rollback(
telemetry_ocp=mock_telemetry,
run_uuid="test-uuid",
scenario_type="scenario"
)
# Verify return code
assert result == 0 if not encounter_exception else 1
# Verify that execute_rollback_version_files was called with correct parameters
mock_execute.assert_called_once_with(
mock_telemetry,
"test-uuid",
"scenario",
ignore_auto_rollback_config=True
)
class TestRollbackAbstractScenarioPlugin:
@pytest.mark.parametrize("auto_rollback", [True, False], ids=["enabled_rollback", "disabled_rollback"])
@pytest.mark.parametrize("scenario_should_fail", [True, False], ids=["failing_scenario", "successful_scenario"])
def test_run_scenarios_respect_auto_rollback_config(self, auto_rollback, scenario_should_fail):
"""Test that run_scenarios respects the auto rollback configuration."""
from krkn.scenario_plugins.abstract_scenario_plugin import AbstractScenarioPlugin
from krkn.rollback.config import RollbackConfig
from unittest.mock import Mock, patch
# Create a test scenario plugin
class TestScenarioPlugin(AbstractScenarioPlugin):
def run(self, run_uuid: str, scenario: str, krkn_config: dict, lib_telemetry, scenario_telemetry):
return 1 if scenario_should_fail else 0
def get_scenario_types(self) -> list[str]:
return ["test_scenario"]
# Create mock objects
mock_telemetry = Mock()
mock_telemetry.set_parameters_base64.return_value = "test_scenario.yaml"
mock_telemetry.get_telemetry_request_id.return_value = "test_request_id"
mock_telemetry.get_lib_kubernetes.return_value = Mock()
test_plugin = TestScenarioPlugin("test_scenario")
# Mock version files to be returned by search
mock_version_files = [
"/tmp/test_versions/123456789-test-uuid/test_scenario_123456789_abcdefgh.py"
]
with (
patch.object(RollbackConfig, 'auto', auto_rollback),
patch.object(RollbackConfig, 'versions_directory', "/tmp/test_versions"),
patch.object(RollbackConfig, 'search_rollback_version_files', return_value=mock_version_files) as mock_search,
patch('krkn.rollback.handler._parse_rollback_module') as mock_parse,
patch('krkn.scenario_plugins.abstract_scenario_plugin.utils.collect_and_put_ocp_logs'),
patch('krkn.scenario_plugins.abstract_scenario_plugin.signal_handler.signal_context') as mock_signal_context,
patch('krkn.scenario_plugins.abstract_scenario_plugin.time.sleep'),
patch('os.path.exists', return_value=True),
patch('os.rename') as mock_rename,
patch('os.remove') as mock_remove
):
# Make signal_context a no-op context manager
mock_signal_context.return_value.__enter__ = Mock(return_value=None)
mock_signal_context.return_value.__exit__ = Mock(return_value=None)
# Mock _parse_rollback_module to return test callable and content
mock_rollback_callable = Mock()
mock_rollback_content = Mock()
mock_parse.return_value = (mock_rollback_callable, mock_rollback_content)
# Call run_scenarios
test_plugin.run_scenarios(
run_uuid="test-uuid",
scenarios_list=["test_scenario.yaml"],
krkn_config={
"tunings": {"wait_duration": 0},
"telemetry": {"events_backup": False}
},
telemetry=mock_telemetry
)
# Verify results
if scenario_should_fail:
if auto_rollback:
# search_rollback_version_files should always be called when scenario fails
mock_search.assert_called_once_with("test-uuid", "test_scenario")
# When auto_rollback is True, _parse_rollback_module should be called
mock_parse.assert_called_once_with(mock_version_files[0])
# And the rollback callable should be executed
mock_rollback_callable.assert_called_once_with(mock_rollback_content, mock_telemetry)
# File should be renamed after successful execution
mock_rename.assert_called_once_with(
mock_version_files[0],
f"{mock_version_files[0]}.executed"
)
else:
# When scenario fail but auto_rollback is False, _parse_rollback_module should NOT be called
mock_search.assert_not_called()
mock_parse.assert_not_called()
mock_rollback_callable.assert_not_called()
mock_rename.assert_not_called()
else:
mock_search.assert_called_once_with("test-uuid", "test_scenario")
# Will remove the version files instead of renaming them if scenario succeeds
mock_remove.assert_called_once_with(
mock_version_files[0]
)
# When scenario succeeds, rollback should not be executed at all
mock_parse.assert_not_called()
mock_rollback_callable.assert_not_called()
mock_rename.assert_not_called()