Files
kubeshark/integration/mcp_test.go
Alon Girmonsky 2ccd716a68 Add MCP registry metadata for official registry submission (#1835)
* Add MCP (Model Context Protocol) server command

Implement `kubeshark mcp` command that runs an MCP server over stdio,
enabling AI assistants to query Kubeshark's network visibility data.

Features:
- MCP protocol implementation (JSON-RPC 2.0 over stdio)
- Dynamic tool discovery from Hub's /api/mcp endpoint
- Local cluster management tools (check_kubeshark_status, start_kubeshark, stop_kubeshark)
- --url flag for direct connection to existing Kubeshark deployment
- --kubeconfig flag for proxy mode with kubectl
- --allow-destructive flag to enable start/stop operations (safe by default)
- --list-tools flag to display available tools
- --mcp-config flag to generate MCP client configuration
- 5-minute cache TTL for Hub tools/prompts
- Prompts for common analysis tasks

* Address code review comments for MCP implementation

- Add 30s timeout to HTTP client to prevent hanging requests
- Add scanner.Err() check after stdin processing loop
- Close HTTP response bodies to prevent resource leaks
- Add goroutine to wait on started process to prevent zombies
- Simplify polling loop by removing ineffective context check
- Advertise check_kubeshark_status in URL mode (was callable but hidden)
- Update documentation to clarify URL mode only disables start/stop

* Fix lint errors in mcpRunner.go

- Use type conversion instead of struct literals for hubMCPTool -> mcpTool
  and hubMCPPromptArg -> mcpPromptArg (S1016 gosimple)
- Lowercase error string to follow Go conventions (ST1005 staticcheck)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* Add MCP server unit tests

Comprehensive unit tests for the MCP server implementation:
- Protocol tests (initialize, tools/list, tools/call, prompts/list, prompts/get)
- Tool tests (check_kubeshark_status, start_kubeshark, stop_kubeshark)
- Hub integration tests (tool fetching, caching, prompt handling)
- Error handling tests
- Edge case tests

* Fix MCP unit tests to use correct /tools/call endpoint

- Update all Hub tool tests to use POST /tools/call endpoint instead
  of individual paths like /workloads, /calls, /stats
- Verify arguments in POST body instead of URL query parameters
- Add newMockHubHandler helper for proper Hub endpoint mocking
- Split TestMCP_ToolsList into three tests:
  - TestMCP_ToolsList_CLIOnly: Tests without Hub backend
  - TestMCP_ToolsList_WithDestructive: Tests with destructive flag
  - TestMCP_ToolsList_WithHubBackend: Tests with mock Hub providing tools
- Fix TestMCP_FullConversation to mock Hub MCP endpoint correctly
- Rename URL encoding tests for clarity
- All tests now correctly reflect the implementation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* Simplify MCP unit tests

- Remove section header comments (10 headers)
- Consolidate similar tests using table-driven patterns
- Simplify test assertions with more concise checks
- Combine edge case tests into single test function
- Reduce verbose test structures

Total reduction: 1477 → 495 lines (66%)
All 24 tests still pass.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* Add MCP integration test framework

Add integration tests that run against a real Kubernetes cluster:
- MCP protocol tests (initialize, tools/list, prompts/list)
- Cluster management tests (check_kubeshark_status, start_kubeshark, stop_kubeshark)
- Full lifecycle test (check -> start -> check -> stop -> check)
- API tools tests (list_workloads, list_api_calls, get_api_stats)

Also includes:
- Makefile targets for running integration tests
- Test helper functions (startMCPSession, cleanupKubeshark, etc.)
- Documentation (README.md, TEMPLATE.md, ISSUE_TEMPLATE.md)

* Address review comments on integration tests

Makefile:
- Use unique temporary files (mktemp) instead of shared /tmp/integration-test.log
  to prevent race conditions when multiple test targets run concurrently
- Remove redundant test-integration-verbose target (test-integration already uses -v)
- Add cleanup (rm -f) for temporary log files

integration/mcp_test.go:
- Capture stderr from MCP server for debugging failures
- Add getStderr() method to mcpSession for accessing captured stderr
- Fix potential goroutine leak by adding return statements after t.Fatalf
- Remove t.Run subtests in TestMCP_APIToolsRequireKubeshark to clarify
  sequential execution with shared session
- Fix benchmark to use getKubesharkBinary helper for consistency
- Add Kubernetes cluster check to benchmark (graceful skip)
- Add proper error handling for pipe creation in benchmark
- Remove unnecessary bytes import workaround (now actually used for stderr)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* Simplify and clean up MCP integration tests

- Remove unrelated L4 viewer files (1239 lines)
- Remove template/issue documentation files (419 lines)
- Trim README to essential content only
- Remove TEMPLATE comments from common_test.go
- Add initialize() helper to reduce test boilerplate
- Add hasKubernetesCluster() helper for benchmarks
- Simplify all test functions with consistent patterns

Total reduction: 2964 → 866 lines (71%)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* Add MCP registry metadata for official registry submission

Add metadata files for submitting Kubeshark MCP server to the official
MCP registry at registry.modelcontextprotocol.io:

- mcp/server.json: Registry metadata with tools, prompts, and configuration
- mcp/README.md: MCP server documentation and usage guide

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-06 10:39:42 -08:00

530 lines
15 KiB
Go

//go:build integration
package integration
import (
"bufio"
"bytes"
"context"
"encoding/json"
"io"
"os/exec"
"strings"
"testing"
"time"
)
// MCPRequest represents a JSON-RPC request
type MCPRequest struct {
JSONRPC string `json:"jsonrpc"`
ID int `json:"id"`
Method string `json:"method"`
Params interface{} `json:"params,omitempty"`
}
// MCPResponse represents a JSON-RPC response
type MCPResponse struct {
JSONRPC string `json:"jsonrpc"`
ID int `json:"id"`
Result json.RawMessage `json:"result,omitempty"`
Error *MCPError `json:"error,omitempty"`
}
// MCPError represents a JSON-RPC error
type MCPError struct {
Code int `json:"code"`
Message string `json:"message"`
}
// mcpSession represents a running MCP server session
type mcpSession struct {
cmd *exec.Cmd
stdin io.WriteCloser
stdout *bufio.Reader
stderr *bytes.Buffer // Captured stderr for debugging
cancel context.CancelFunc
}
// startMCPSession starts an MCP server and returns a session for sending requests.
// By default, starts in read-only mode (no --allow-destructive).
func startMCPSession(t *testing.T, binary string, args ...string) *mcpSession {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
cmdArgs := append([]string{"mcp"}, args...)
cmd := exec.CommandContext(ctx, binary, cmdArgs...)
stdin, err := cmd.StdinPipe()
if err != nil {
cancel()
t.Fatalf("Failed to create stdin pipe: %v", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
t.Fatalf("Failed to create stdout pipe: %v", err)
}
// Capture stderr for debugging
var stderrBuf bytes.Buffer
cmd.Stderr = &stderrBuf
if err := cmd.Start(); err != nil {
cancel()
t.Fatalf("Failed to start MCP server: %v", err)
}
return &mcpSession{
cmd: cmd,
stdin: stdin,
stdout: bufio.NewReader(stdout),
stderr: &stderrBuf,
cancel: cancel,
}
}
// startMCPSessionWithDestructive starts an MCP server with --allow-destructive flag.
func startMCPSessionWithDestructive(t *testing.T, binary string, args ...string) *mcpSession {
t.Helper()
allArgs := append([]string{"--allow-destructive"}, args...)
return startMCPSession(t, binary, allArgs...)
}
// sendRequest sends a JSON-RPC request and returns the response (30s timeout).
func (s *mcpSession) sendRequest(t *testing.T, req MCPRequest) MCPResponse {
t.Helper()
return s.sendRequestWithTimeout(t, req, 30*time.Second)
}
// sendRequestWithTimeout sends a JSON-RPC request with a custom timeout.
func (s *mcpSession) sendRequestWithTimeout(t *testing.T, req MCPRequest, timeout time.Duration) MCPResponse {
t.Helper()
reqBytes, err := json.Marshal(req)
if err != nil {
t.Fatalf("Failed to marshal request: %v", err)
}
t.Logf("Sending: %s", string(reqBytes))
if _, err := s.stdin.Write(append(reqBytes, '\n')); err != nil {
t.Fatalf("Failed to write request: %v", err)
}
// Read response with timeout
responseChan := make(chan string, 1)
errChan := make(chan error, 1)
go func() {
line, err := s.stdout.ReadString('\n')
if err != nil {
errChan <- err
return
}
responseChan <- line
}()
select {
case line := <-responseChan:
t.Logf("Received: %s", strings.TrimSpace(line))
var resp MCPResponse
if err := json.Unmarshal([]byte(line), &resp); err != nil {
t.Fatalf("Failed to unmarshal response: %v\nResponse: %s", err, line)
}
return resp
case err := <-errChan:
t.Fatalf("Failed to read response: %v", err)
return MCPResponse{}
case <-time.After(timeout):
t.Fatalf("Timeout waiting for MCP response after %v", timeout)
return MCPResponse{}
}
}
// callTool invokes an MCP tool and returns the response (30s timeout).
func (s *mcpSession) callTool(t *testing.T, id int, toolName string, args map[string]interface{}) MCPResponse {
t.Helper()
return s.callToolWithTimeout(t, id, toolName, args, 30*time.Second)
}
// callToolWithTimeout invokes an MCP tool with a custom timeout.
func (s *mcpSession) callToolWithTimeout(t *testing.T, id int, toolName string, args map[string]interface{}, timeout time.Duration) MCPResponse {
t.Helper()
return s.sendRequestWithTimeout(t, MCPRequest{
JSONRPC: "2.0",
ID: id,
Method: "tools/call",
Params: map[string]interface{}{
"name": toolName,
"arguments": args,
},
}, timeout)
}
// close terminates the MCP session.
func (s *mcpSession) close() {
s.cancel()
_ = s.cmd.Wait()
}
// getStderr returns any captured stderr output (useful for debugging failures).
func (s *mcpSession) getStderr() string {
if s.stderr == nil {
return ""
}
return s.stderr.String()
}
// initialize sends the MCP initialize request and returns the response.
func (s *mcpSession) initialize(t *testing.T, id int) MCPResponse {
t.Helper()
return s.sendRequest(t, MCPRequest{
JSONRPC: "2.0",
ID: id,
Method: "initialize",
Params: map[string]interface{}{
"protocolVersion": "2024-11-05",
"capabilities": map[string]interface{}{},
"clientInfo": map[string]interface{}{"name": "test", "version": "1.0"},
},
})
}
// TestMCP_Initialize tests the MCP initialization handshake.
func TestMCP_Initialize(t *testing.T) {
requireKubernetesCluster(t)
session := startMCPSession(t, getKubesharkBinary(t))
defer session.close()
resp := session.initialize(t, 1)
if resp.Error != nil {
t.Fatalf("Initialize failed: %s", resp.Error.Message)
}
var result map[string]interface{}
if err := json.Unmarshal(resp.Result, &result); err != nil {
t.Fatalf("Failed to parse result: %v", err)
}
if _, ok := result["capabilities"]; !ok {
t.Error("Response missing capabilities")
}
if _, ok := result["serverInfo"]; !ok {
t.Error("Response missing serverInfo")
}
}
// TestMCP_ToolsList_ReadOnly tests that tools/list returns only safe tools in read-only mode.
func TestMCP_ToolsList_ReadOnly(t *testing.T) {
requireKubernetesCluster(t)
session := startMCPSession(t, getKubesharkBinary(t))
defer session.close()
session.initialize(t, 1)
resp := session.sendRequest(t, MCPRequest{JSONRPC: "2.0", ID: 2, Method: "tools/list"})
if resp.Error != nil {
t.Fatalf("tools/list failed: %s", resp.Error.Message)
}
var result struct {
Tools []struct{ Name string `json:"name"` } `json:"tools"`
}
if err := json.Unmarshal(resp.Result, &result); err != nil {
t.Fatalf("Failed to parse result: %v", err)
}
toolNames := make(map[string]bool)
for _, tool := range result.Tools {
toolNames[tool.Name] = true
}
if !toolNames["check_kubeshark_status"] {
t.Error("Missing expected tool: check_kubeshark_status")
}
if toolNames["start_kubeshark"] || toolNames["stop_kubeshark"] {
t.Error("Destructive tools should not be available in read-only mode")
}
}
// TestMCP_ToolsList_WithDestructive tests that tools/list includes destructive tools when flag is set.
func TestMCP_ToolsList_WithDestructive(t *testing.T) {
requireKubernetesCluster(t)
session := startMCPSessionWithDestructive(t, getKubesharkBinary(t))
defer session.close()
session.initialize(t, 1)
resp := session.sendRequest(t, MCPRequest{JSONRPC: "2.0", ID: 2, Method: "tools/list"})
if resp.Error != nil {
t.Fatalf("tools/list failed: %s", resp.Error.Message)
}
var result struct {
Tools []struct{ Name string `json:"name"` } `json:"tools"`
}
if err := json.Unmarshal(resp.Result, &result); err != nil {
t.Fatalf("Failed to parse result: %v", err)
}
toolNames := make(map[string]bool)
for _, tool := range result.Tools {
toolNames[tool.Name] = true
}
for _, expected := range []string{"check_kubeshark_status", "start_kubeshark", "stop_kubeshark"} {
if !toolNames[expected] {
t.Errorf("Missing expected tool: %s", expected)
}
}
}
// TestMCP_CheckKubesharkStatus_NotRunning tests check_kubeshark_status when Kubeshark is not running.
func TestMCP_CheckKubesharkStatus_NotRunning(t *testing.T) {
requireKubernetesCluster(t)
binary := getKubesharkBinary(t)
cleanupKubeshark(t, binary)
session := startMCPSession(t, binary)
defer session.close()
session.initialize(t, 1)
resp := session.callTool(t, 2, "check_kubeshark_status", nil)
if resp.Error != nil {
t.Fatalf("check_kubeshark_status failed: %s", resp.Error.Message)
}
var result struct {
Content []struct{ Text string `json:"text"` } `json:"content"`
}
if err := json.Unmarshal(resp.Result, &result); err != nil {
t.Fatalf("Failed to parse result: %v", err)
}
if len(result.Content) == 0 || (!strings.Contains(result.Content[0].Text, "not running") && !strings.Contains(result.Content[0].Text, "NOT")) {
t.Errorf("Expected 'not running' status")
}
}
// TestMCP_StartKubeshark tests the start_kubeshark tool.
func TestMCP_StartKubeshark(t *testing.T) {
if testing.Short() {
t.Skip("Skipping in short mode")
}
requireKubernetesCluster(t)
binary := getKubesharkBinary(t)
cleanupKubeshark(t, binary)
t.Cleanup(func() { cleanupKubeshark(t, binary) })
session := startMCPSessionWithDestructive(t, binary)
defer session.close()
session.initialize(t, 1)
resp := session.callToolWithTimeout(t, 2, "start_kubeshark", nil, 3*time.Minute)
if resp.Error != nil {
t.Fatalf("start_kubeshark failed: %s", resp.Error.Message)
}
if !isKubesharkRunning(t) {
t.Error("Kubeshark should be running after start_kubeshark")
}
}
// TestMCP_StartKubeshark_WithoutFlag tests that start_kubeshark fails without --allow-destructive.
func TestMCP_StartKubeshark_WithoutFlag(t *testing.T) {
requireKubernetesCluster(t)
session := startMCPSession(t, getKubesharkBinary(t))
defer session.close()
session.initialize(t, 1)
resp := session.callTool(t, 2, "start_kubeshark", nil)
var result struct {
Content []struct{ Text string `json:"text"` } `json:"content"`
IsError bool `json:"isError"`
}
if err := json.Unmarshal(resp.Result, &result); err != nil {
t.Fatalf("Failed to parse result: %v", err)
}
if !result.IsError {
t.Error("Expected isError=true without --allow-destructive")
}
}
// TestMCP_StopKubeshark tests the stop_kubeshark tool.
func TestMCP_StopKubeshark(t *testing.T) {
if testing.Short() {
t.Skip("Skipping in short mode")
}
requireKubernetesCluster(t)
binary := getKubesharkBinary(t)
session := startMCPSessionWithDestructive(t, binary)
defer session.close()
session.initialize(t, 0)
// Start Kubeshark if not running
if !isKubesharkRunning(t) {
resp := session.callToolWithTimeout(t, 1, "start_kubeshark", nil, 2*time.Minute)
if resp.Error != nil {
t.Skipf("Could not start Kubeshark: %v", resp.Error.Message)
}
}
resp := session.callToolWithTimeout(t, 2, "stop_kubeshark", nil, 2*time.Minute)
if resp.Error != nil {
t.Fatalf("stop_kubeshark failed: %s", resp.Error.Message)
}
time.Sleep(5 * time.Second)
if isKubesharkRunning(t) {
t.Error("Kubeshark should not be running after stop_kubeshark")
}
}
// TestMCP_StopKubeshark_WithoutFlag tests that stop_kubeshark fails without --allow-destructive.
func TestMCP_StopKubeshark_WithoutFlag(t *testing.T) {
requireKubernetesCluster(t)
session := startMCPSession(t, getKubesharkBinary(t))
defer session.close()
session.initialize(t, 1)
resp := session.callTool(t, 2, "stop_kubeshark", nil)
var result struct {
IsError bool `json:"isError"`
}
if err := json.Unmarshal(resp.Result, &result); err != nil {
t.Fatalf("Failed to parse result: %v", err)
}
if !result.IsError {
t.Error("Expected isError=true without --allow-destructive")
}
}
// TestMCP_FullLifecycle tests the complete lifecycle: check -> start -> check -> stop -> check
func TestMCP_FullLifecycle(t *testing.T) {
if testing.Short() {
t.Skip("Skipping in short mode")
}
requireKubernetesCluster(t)
binary := getKubesharkBinary(t)
cleanupKubeshark(t, binary)
session := startMCPSessionWithDestructive(t, binary)
defer session.close()
session.initialize(t, 1)
// Check -> Start -> Check -> Stop -> Check
if resp := session.callTool(t, 2, "check_kubeshark_status", nil); resp.Error != nil {
t.Fatalf("Initial status check failed: %s", resp.Error.Message)
}
if resp := session.callToolWithTimeout(t, 3, "start_kubeshark", nil, 3*time.Minute); resp.Error != nil {
t.Fatalf("Start failed: %s", resp.Error.Message)
}
if err := waitForKubesharkReady(t, binary, startupTimeout); err != nil {
t.Fatalf("Kubeshark did not become ready: %v", err)
}
if resp := session.callTool(t, 4, "check_kubeshark_status", nil); resp.Error != nil {
t.Fatalf("Status check after start failed: %s", resp.Error.Message)
}
if resp := session.callToolWithTimeout(t, 5, "stop_kubeshark", nil, 2*time.Minute); resp.Error != nil {
t.Fatalf("Stop failed: %s", resp.Error.Message)
}
time.Sleep(5 * time.Second)
if resp := session.callTool(t, 6, "check_kubeshark_status", nil); resp.Error != nil {
t.Fatalf("Final status check failed: %s", resp.Error.Message)
}
}
// TestMCP_APIToolsRequireKubeshark tests that API tools return helpful errors when Kubeshark isn't running.
func TestMCP_APIToolsRequireKubeshark(t *testing.T) {
requireKubernetesCluster(t)
binary := getKubesharkBinary(t)
cleanupKubeshark(t, binary)
session := startMCPSession(t, binary)
defer session.close()
session.initialize(t, 1)
for i, tool := range []string{"list_workloads", "list_api_calls", "get_api_stats"} {
resp := session.callTool(t, i+2, tool, nil)
// Either error or helpful message is acceptable
if resp.Error != nil {
t.Logf("%s returned error (expected): %s", tool, resp.Error.Message)
}
}
}
// TestMCP_SetFlags tests that --set flags are passed correctly.
func TestMCP_SetFlags(t *testing.T) {
requireKubernetesCluster(t)
session := startMCPSession(t, getKubesharkBinary(t), "--set", "tap.namespaces={default}")
defer session.close()
session.initialize(t, 1)
resp := session.sendRequest(t, MCPRequest{JSONRPC: "2.0", ID: 2, Method: "tools/list"})
if resp.Error != nil {
t.Fatalf("tools/list failed with --set flags: %s", resp.Error.Message)
}
}
// BenchmarkMCP_CheckStatus benchmarks the check_kubeshark_status tool.
func BenchmarkMCP_CheckStatus(b *testing.B) {
if testing.Short() {
b.Skip("Skipping benchmark in short mode")
}
if !hasKubernetesCluster() {
b.Skip("Skipping: no Kubernetes cluster available")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cmd := exec.CommandContext(ctx, getKubesharkBinary(b), "mcp")
stdin, _ := cmd.StdinPipe()
stdout, _ := cmd.StdoutPipe()
reader := bufio.NewReader(stdout)
if err := cmd.Start(); err != nil {
b.Fatalf("Failed to start MCP: %v", err)
}
defer func() { cancel(); _ = cmd.Wait() }()
// Initialize
initReq, _ := json.Marshal(MCPRequest{
JSONRPC: "2.0", ID: 0, Method: "initialize",
Params: map[string]interface{}{
"protocolVersion": "2024-11-05",
"capabilities": map[string]interface{}{},
"clientInfo": map[string]interface{}{"name": "bench", "version": "1.0"},
},
})
_, _ = stdin.Write(append(initReq, '\n'))
_, _ = reader.ReadString('\n')
b.ResetTimer()
for i := 0; i < b.N; i++ {
req, _ := json.Marshal(MCPRequest{
JSONRPC: "2.0", ID: i + 1, Method: "tools/call",
Params: map[string]interface{}{"name": "check_kubeshark_status", "arguments": map[string]interface{}{}},
})
if _, err := stdin.Write(append(req, '\n')); err != nil {
b.Fatalf("Write failed: %v", err)
}
if _, err := reader.ReadString('\n'); err != nil {
b.Fatalf("Read failed: %v", err)
}
}
}