mirror of
https://github.com/krkn-chaos/krkn.git
synced 2026-04-15 06:57:28 +00:00
Fixes #265: Replace Powerfulseal and introduce Wolkenwalze SDK for plugin system
This commit is contained in:
51
kraken/plugins/run_python_plugin.py
Normal file
51
kraken/plugins/run_python_plugin.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import dataclasses
|
||||
import io
|
||||
import subprocess
|
||||
import sys
|
||||
import typing
|
||||
|
||||
from arcaflow_plugin_sdk import plugin
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class RunPythonFileInput:
|
||||
filename: str
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class RunPythonFileOutput:
|
||||
stdout: str
|
||||
stderr: str
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class RunPythonFileError:
|
||||
exit_code: int
|
||||
stdout: str
|
||||
stderr: str
|
||||
|
||||
|
||||
@plugin.step(
|
||||
id="run_python",
|
||||
name="Run a Python script",
|
||||
description="Run a specified Python script",
|
||||
outputs={"success": RunPythonFileOutput, "error": RunPythonFileError}
|
||||
)
|
||||
def run_python_file(params: RunPythonFileInput) -> typing.Tuple[
|
||||
str,
|
||||
typing.Union[RunPythonFileOutput, RunPythonFileError]
|
||||
]:
|
||||
run_results = subprocess.run(
|
||||
[sys.executable, params.filename],
|
||||
capture_output=True
|
||||
)
|
||||
if run_results.returncode == 0:
|
||||
return "success", RunPythonFileOutput(
|
||||
str(run_results.stdout, 'utf-8'),
|
||||
str(run_results.stderr, 'utf-8')
|
||||
)
|
||||
return "error", RunPythonFileError(
|
||||
run_results.returncode,
|
||||
str(run_results.stdout, 'utf-8'),
|
||||
str(run_results.stderr, 'utf-8')
|
||||
)
|
||||
Reference in New Issue
Block a user