mirror of
https://github.com/kubescape/kubescape.git
synced 2026-02-14 09:59:54 +00:00
fixed stdin support
This commit is contained in:
@@ -7,7 +7,13 @@ def get_exec_from_args(args: list):
|
||||
|
||||
def run_command(command):
|
||||
try:
|
||||
return f"{subprocess.check_output(command, stderr=subprocess.STDOUT)}"
|
||||
return f"{subprocess.check_output(command, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)}"
|
||||
except Exception as e:
|
||||
return f"{e}"
|
||||
|
||||
|
||||
def assertion(msg):
|
||||
errors = ["Error: invalid parameter", "exit status 1"]
|
||||
for e in errors:
|
||||
assert e not in msg, msg
|
||||
|
||||
|
||||
@@ -3,15 +3,71 @@ import smoke_utils
|
||||
import sys
|
||||
|
||||
|
||||
def full_scan(kubescape_exec: str):
|
||||
return smoke_utils.run_command(command=[kubescape_exec, "scan", "framework", "nsa", os.path.join("..", "*.yaml")])
|
||||
all_files = os.path.join("..", "examples", "online-boutique", "*.yaml")
|
||||
single_file = os.path.join("..", "examples", "online-boutique", "frontend.yaml")
|
||||
|
||||
|
||||
def scan_all(kubescape_exec: str):
|
||||
return smoke_utils.run_command(command=[kubescape_exec, "scan", all_files])
|
||||
|
||||
|
||||
def scan_control_name(kubescape_exec: str):
|
||||
return smoke_utils.run_command(command=[kubescape_exec, "scan", "control", 'Allowed hostPath', all_files])
|
||||
|
||||
|
||||
def scan_control_id(kubescape_exec: str):
|
||||
return smoke_utils.run_command(command=[kubescape_exec, "scan", "control", 'C-0006', all_files])
|
||||
|
||||
|
||||
def scan_controls(kubescape_exec: str):
|
||||
return smoke_utils.run_command(command=[kubescape_exec, "scan", "control", 'Allowed hostPath,Allow privilege escalation', all_files])
|
||||
|
||||
|
||||
def scan_framework(kubescape_exec: str):
|
||||
return smoke_utils.run_command(command=[kubescape_exec, "scan", "framework", "nsa", all_files])
|
||||
|
||||
|
||||
def scan_frameworks(kubescape_exec: str):
|
||||
return smoke_utils.run_command(command=[kubescape_exec, "scan", "framework", "nsa,mitre,armobest", all_files])
|
||||
|
||||
|
||||
def scan_from_stdin(kubescape_exec: str):
|
||||
return smoke_utils.run_command(command=["cat", single_file, "|", kubescape_exec, "scan", "framework", "nsa", "-"])
|
||||
|
||||
|
||||
def run(kubescape_exec: str):
|
||||
# return
|
||||
print("Testing E2E yaml files")
|
||||
msg = full_scan(kubescape_exec=kubescape_exec)
|
||||
assert "exit status 1" not in msg, msg
|
||||
print("Testing E2E on yaml files")
|
||||
|
||||
# TODO - fix support
|
||||
# print("Testing scan all yaml files")
|
||||
# msg = scan_all(kubescape_exec=kubescape_exec)
|
||||
# smoke_utils.assertion(msg)
|
||||
|
||||
print("Testing scan control name")
|
||||
msg = scan_control_name(kubescape_exec=kubescape_exec)
|
||||
smoke_utils.assertion(msg)
|
||||
|
||||
print("Testing scan control id")
|
||||
msg = scan_control_id(kubescape_exec=kubescape_exec)
|
||||
smoke_utils.assertion(msg)
|
||||
|
||||
print("Testing scan controls")
|
||||
msg = scan_controls(kubescape_exec=kubescape_exec)
|
||||
smoke_utils.assertion(msg)
|
||||
|
||||
print("Testing scan framework")
|
||||
msg = scan_framework(kubescape_exec=kubescape_exec)
|
||||
smoke_utils.assertion(msg)
|
||||
|
||||
print("Testing scan frameworks")
|
||||
msg = scan_frameworks(kubescape_exec=kubescape_exec)
|
||||
smoke_utils.assertion(msg)
|
||||
|
||||
# TODO - fix test
|
||||
# print("Testing scan from stdin")
|
||||
# msg = scan_from_stdin(kubescape_exec=kubescape_exec)
|
||||
# smoke_utils.assertion(msg)
|
||||
|
||||
print("Done E2E yaml files")
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user