Add configuration scan tools and resource handling

Implemented two new tools: 'list_configuration_security_scan_manifests' for listing available configuration scan manifests and 'get_configuration_security_scan_manifest' for retrieving details of a specific manifest. Added a resource template for configuration manifests and integrated these tools into the mcpserver.

Signed-off-by: Ben <ben@armosec.io>
This commit is contained in:
Ben
2025-06-09 09:11:28 +03:00
parent fa05dcd00d
commit 1ea4e0c304

View File

@@ -87,6 +87,46 @@ func createVulnerabilityToolsAndResources(ksServer *KubescapeMcpserver) {
}
func createConfigurationsToolsAndResources(ksServer *KubescapeMcpserver) {
// Tool to list configuration manifests
listConfigsTool := mcp.NewTool(
"list_configuration_security_scan_manifests",
mcp.WithDescription("Discover available security configuration scan results at workload level (this returns a list of manifests, not the scan results themselves, to get the scan results, use the get_configuration_security_scan_manifest tool)"),
mcp.WithString("namespace",
mcp.Description("Filter by namespace (optional)"),
),
)
ksServer.s.AddTool(listConfigsTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
return ksServer.CallTool("list_configuration_security_scan_manifests", request.Params.Arguments.(map[string]interface{}))
})
getConfigDetailsTool := mcp.NewTool(
"get_configuration_security_scan_manifest",
mcp.WithDescription("Get details of a specific security configuration scan result"),
mcp.WithString("namespace",
mcp.Description("Namespace of the manifest (optional, defaults to 'kubescape')"),
),
mcp.WithString("manifest_name",
mcp.Required(),
mcp.Description("Name of the configuration manifest to get details for (get this from the list_configuration_security_scan_manifests tool)"),
),
)
ksServer.s.AddTool(getConfigDetailsTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
return ksServer.CallTool("get_configuration_security_scan_manifest", request.Params.Arguments.(map[string]interface{}))
})
configManifestTemplate := mcp.NewResourceTemplate(
"kubescape://configuration-manifests/{namespace}/{manifest_name}",
"Configuration Security Scan Manifest",
mcp.WithTemplateDescription("Complete configuration scan manifest for a specific workload. Use 'list_configuration_security_scan_manifests' tool to discover available manifests."),
mcp.WithTemplateMIMEType("application/json"),
)
ksServer.s.AddResourceTemplate(configManifestTemplate, ksServer.ReadConfigurationResource)
}
func (ksServer *KubescapeMcpserver) ReadResource(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
uri := request.Params.URI
// Validate the URI and check if it starts with kubescape://vulnerability-manifests/
@@ -149,6 +189,31 @@ func (ksServer *KubescapeMcpserver) ReadResource(ctx context.Context, request mc
}}, nil
}
func (ksServer *KubescapeMcpserver) ReadConfigurationResource(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
uri := request.Params.URI
if !strings.HasPrefix(uri, "kubescape://configuration-manifests/") {
return nil, fmt.Errorf("invalid URI: %s", uri)
}
parts := strings.Split(uri[len("kubescape://configuration-manifests/"):], "/")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid URI: %s", uri)
}
namespace := parts[0]
manifestName := parts[1]
manifest, err := ksServer.ksClient.WorkloadConfigurationScans(namespace).Get(ctx, manifestName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get configuration manifest: %s", err)
}
responseJson, err := json.Marshal(manifest)
if err != nil {
return nil, fmt.Errorf("failed to marshal configuration manifest: %s", err)
}
return []mcp.ResourceContents{mcp.TextResourceContents{
URI: uri,
Text: string(responseJson),
}}, nil
}
func (ksServer *KubescapeMcpserver) CallTool(name string, arguments map[string]interface{}) (*mcp.CallToolResult, error) {
switch name {
case "list_vulnerability_manifests":
@@ -287,6 +352,67 @@ func (ksServer *KubescapeMcpserver) CallTool(name string, arguments map[string]i
},
},
}, nil
case "list_configuration_security_scan_manifests":
namespace, ok := arguments["namespace"]
if !ok {
namespace = "kubescape"
}
manifests, err := ksServer.ksClient.WorkloadConfigurationScans(namespace.(string)).List(context.Background(), metav1.ListOptions{})
if err != nil {
return nil, err
}
log.Printf("Found %d configuration manifests", len(manifests.Items))
configManifests := []map[string]interface{}{}
for _, manifest := range manifests.Items {
item := map[string]interface{}{
"namespace": manifest.Namespace,
"manifest_name": manifest.Name,
"resource_uri": fmt.Sprintf("kubescape://configuration-manifests/%s/%s", manifest.Namespace, manifest.Name),
}
configManifests = append(configManifests, item)
}
result := map[string]interface{}{
"configuration_manifests": map[string]interface{}{
"manifests": configManifests,
},
"available_templates": map[string]string{
"configuration_manifest_details": "kubescape://configuration-manifests/{namespace}/{manifest_name}",
},
}
content, _ := json.Marshal(result)
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: string(content),
},
},
}, nil
case "get_configuration_security_scan_manifest":
namespace, ok := arguments["namespace"]
if !ok {
namespace = "kubescape"
}
manifestName, ok := arguments["manifest_name"]
if !ok {
return nil, fmt.Errorf("manifest_name is required")
}
manifest, err := ksServer.ksClient.WorkloadConfigurationScans(namespace.(string)).Get(context.Background(), manifestName.(string), metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get configuration manifest: %s", err)
}
responseJson, err := json.Marshal(manifest)
if err != nil {
return nil, fmt.Errorf("failed to marshal configuration manifest: %s", err)
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: string(responseJson),
},
},
}, nil
default:
return nil, fmt.Errorf("unknown tool: %s", name)
}
@@ -315,7 +441,9 @@ func mcpServerEntrypoint() error {
}
// Creating Kubescape tools and resources
createVulnerabilityToolsAndResources(ksServer)
createConfigurationsToolsAndResources(ksServer)
// Start the server
if err := server.ServeStdio(s); err != nil {