Handle optional secrets and config maps, remove some false errors

This commit is contained in:
Philippe Merle
2025-01-06 23:39:13 +01:00
parent 02a07a1489
commit 8db2cdd194
2 changed files with 34 additions and 14 deletions

View File

@@ -172,11 +172,14 @@ class EdgesContext(list):
def add_volumes(volumes):
for volume in volumes:
if "configMap" in volume:
configMap = volume["configMap"]
configMap_id = f"{configMap.get('name')}/{self.namespace}/ConfigMap/v1"
if configMap.get("optional") is True:
if configMap_id not in resources:
print(f"Warning: optional config map {configMap_id} undefined!")
continue # skip it
self.append([
"%s/%s/ConfigMap/v1" % (
volume["configMap"]["name"],
self.namespace
),
configMap_id,
"REFERENCE"
])
elif "secret" in volume:
@@ -185,11 +188,13 @@ class EdgesContext(list):
if secretName is None:
# Warning: jenkins chart uses 'name' instead of 'secretName'!
secretName = secret.get("name")
secret_id = f"{secretName}/{self.namespace}/Secret/v1"
if secret.get("optional") is True:
if secret_id not in resources:
print(f"Warning: optional secret {secret_id} undefined!")
continue # skip it
self.append([
"%s/%s/Secret/v1" % (
secretName,
self.namespace
),
secret_id,
"REFERENCE"
])
elif "persistentVolumeClaim" in volume:
@@ -225,11 +230,13 @@ class EdgesContext(list):
)
secretKeyRefName = query_path(env, "valueFrom.secretKeyRef.name")
if secretKeyRefName != None:
secret_id = f"{secretKeyRefName}/{self.namespace}/Secret/v1"
if query_path(env, "valueFrom.secretKeyRef.optional") is True:
if secret_id not in resources:
print(f"Warning: optional secret {secret_id} undefined!")
continue # skip it
target_resources.add(
"%s/%s/Secret/v1" % (
secretKeyRefName,
self.namespace
)
secret_id
)
for target_resource in target_resources:
self.append([target_resource, "REFERENCE"])
@@ -305,10 +312,13 @@ class EdgesContext(list):
def add_subjects(self):
for subject in query_path(self.resource, "subjects", []):
if subject.get("namespace"):
namespace = subject.get("namespace")
if namespace is None and subject["kind"] == "ServiceAccount":
namespace = get_namespace(self.resource)
if namespace != None:
rid = "%s/%s/%s/%s" % (
subject["name"],
subject["namespace"],
namespace,
subject["kind"],
query_path(subject, "apiGroup", "v1")
)
@@ -364,6 +374,9 @@ class EdgesContext(list):
])
def add_all_workload_resources(self, path, custom_workload_resources={}, edge_kind="SELECTOR"):
selector = query_path(self.resource, path)
if selector is None:
return
build_in_workload_resources = {
"Pod": "metadata.labels",
"Deployment": "spec.template.metadata.labels",
@@ -555,6 +568,9 @@ with Diagram("", filename=args.output, show=False, direction="TB", outformat=arg
diagram_nodes[edge_rid] >> Edge(color="white", style="invisible") >> diagram_nodes[resource_id]
diagram_nodes[resource_id] >> Edge(**edge_config) >> diagram_nodes[edge_rid]
except KeyError as ke:
if edge_rid in config["cluster-resources"]:
print(f"Info: referenced {edge_rid} resource is provided by K8s clusters.")
continue # skip this edge as the resource is provided by K8s clusters.
print("Error: %s resource not found!" % ke)
tmp = str(edge_rid).split("/")
if tmp[2] in ["ServiceAccount", "Service", "ConfigMap", "Secret"]: #TODO: avoid specific cases

View File

@@ -315,3 +315,7 @@ nodes:
#TODO: add other Kubernetes resource types.
# PSP diagrams.k8s.others.PSP
# Vol diagrams.k8s.storage.Vol, Volume (alias)
cluster-resources:
- default/default/ServiceAccount/v1
- system:auth-delegator/ClusterRole/rbac.authorization.k8s.io/v1