mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-26 15:53:55 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
68c4ee9a4f | ||
|
|
bfbbc27e62 | ||
|
|
e2df973fe6 | ||
|
|
656809512b | ||
|
|
b96542a8ed | ||
|
|
a55f51f0e7 |
19
Makefile
19
Makefile
@@ -8,7 +8,7 @@ SHELL=/bin/bash
|
||||
# HELP
|
||||
# This will output the help for each task
|
||||
# thanks to https://marmelab.com/blog/2016/02/29/auto-documented-makefile.html
|
||||
.PHONY: help ui agent cli tap docker
|
||||
.PHONY: help ui extensions extensions-debug agent agent-debug cli tap docker
|
||||
|
||||
help: ## This help.
|
||||
@awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST)
|
||||
@@ -28,6 +28,9 @@ ui: ## Build UI.
|
||||
cli: ## Build CLI.
|
||||
@echo "building cli"; cd cli && $(MAKE) build
|
||||
|
||||
cli-debug: ## Build CLI.
|
||||
@echo "building cli"; cd cli && $(MAKE) build-debug
|
||||
|
||||
build-cli-ci: ## Build CLI for CI.
|
||||
@echo "building cli for ci"; cd cli && $(MAKE) build GIT_BRANCH=ci SUFFIX=ci
|
||||
|
||||
@@ -37,6 +40,12 @@ agent: ## Build agent.
|
||||
${MAKE} extensions
|
||||
@ls -l agent/build
|
||||
|
||||
agent-debug: ## Build agent for debug.
|
||||
@(echo "building mizu agent for debug.." )
|
||||
@(cd agent; go build -gcflags="all=-N -l" -o build/mizuagent main.go)
|
||||
${MAKE} extensions-debug
|
||||
@ls -l agent/build
|
||||
|
||||
docker: ## Build and publish agent docker image.
|
||||
$(MAKE) push-docker
|
||||
|
||||
@@ -62,7 +71,7 @@ push-cli: ## Build and publish CLI.
|
||||
gsutil cp -r ./cli/bin/* gs://${BUCKET_PATH}/
|
||||
gsutil setmeta -r -h "Cache-Control:public, max-age=30" gs://${BUCKET_PATH}/\*
|
||||
|
||||
clean: clean-ui clean-agent clean-cli clean-docker ## Clean all build artifacts.
|
||||
clean: clean-ui clean-agent clean-cli clean-docker clean-extensions ## Clean all build artifacts.
|
||||
|
||||
clean-ui: ## Clean UI.
|
||||
@(rm -rf ui/build ; echo "UI cleanup done" )
|
||||
@@ -73,9 +82,15 @@ clean-agent: ## Clean agent.
|
||||
clean-cli: ## Clean CLI.
|
||||
@(cd cli; make clean ; echo "CLI cleanup done" )
|
||||
|
||||
clean-extensions: ## Clean extensions
|
||||
@(rm -rf tap/extensions/*.so ; echo "Extensions cleanup done" )
|
||||
|
||||
clean-docker:
|
||||
@(echo "DOCKER cleanup - NOT IMPLEMENTED YET " )
|
||||
|
||||
extensions-debug:
|
||||
devops/build_extensions_debug.sh
|
||||
|
||||
extensions:
|
||||
devops/build_extensions.sh
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/providers"
|
||||
"mizuserver/pkg/providers/tappersCount"
|
||||
"mizuserver/pkg/up9"
|
||||
"sync"
|
||||
|
||||
@@ -29,7 +30,7 @@ func init() {
|
||||
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
||||
if isTapper {
|
||||
logger.Log.Infof("Websocket event - Tapper connected, socket ID: %d", socketId)
|
||||
providers.TapperAdded()
|
||||
tappersCount.Add()
|
||||
} else {
|
||||
logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
|
||||
socketListLock.Lock()
|
||||
@@ -41,7 +42,7 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
||||
func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
|
||||
if isTapper {
|
||||
logger.Log.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId)
|
||||
providers.TapperRemoved()
|
||||
tappersCount.Remove()
|
||||
} else {
|
||||
logger.Log.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId)
|
||||
socketListLock.Lock()
|
||||
|
||||
@@ -11,18 +11,20 @@ import (
|
||||
"mizuserver/pkg/config"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/providers"
|
||||
"mizuserver/pkg/providers/tapConfig"
|
||||
"mizuserver/pkg/providers/tappedPods"
|
||||
"mizuserver/pkg/providers/tappersStatus"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"time"
|
||||
)
|
||||
|
||||
var globalTapConfig = &models.TapConfig{TappedNamespaces: make(map[string]bool)}
|
||||
var cancelTapperSyncer context.CancelFunc
|
||||
|
||||
func PostTapConfig(c *gin.Context) {
|
||||
tapConfig := &models.TapConfig{}
|
||||
requestTapConfig := &models.TapConfig{}
|
||||
|
||||
if err := c.Bind(tapConfig); err != nil {
|
||||
if err := c.Bind(requestTapConfig); err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
@@ -30,14 +32,14 @@ func PostTapConfig(c *gin.Context) {
|
||||
if cancelTapperSyncer != nil {
|
||||
cancelTapperSyncer()
|
||||
|
||||
providers.TapStatus = shared.TapStatus{}
|
||||
providers.TappersStatus = make(map[string]shared.TapperStatus)
|
||||
tappedPods.Set([]*shared.PodInfo{})
|
||||
tappersStatus.Reset()
|
||||
|
||||
broadcastTappedPodsStatus()
|
||||
}
|
||||
|
||||
var tappedNamespaces []string
|
||||
for namespace, tapped := range tapConfig.TappedNamespaces {
|
||||
for namespace, tapped := range requestTapConfig.TappedNamespaces {
|
||||
if tapped {
|
||||
tappedNamespaces = append(tappedNamespaces, namespace)
|
||||
}
|
||||
@@ -60,7 +62,7 @@ func PostTapConfig(c *gin.Context) {
|
||||
}
|
||||
|
||||
cancelTapperSyncer = cancel
|
||||
globalTapConfig = tapConfig
|
||||
tapConfig.Save(requestTapConfig)
|
||||
|
||||
c.JSON(http.StatusOK, "OK")
|
||||
}
|
||||
@@ -81,17 +83,19 @@ func GetTapConfig(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
savedTapConfig := tapConfig.Get()
|
||||
|
||||
tappedNamespaces := make(map[string]bool)
|
||||
for _, namespace := range namespaces {
|
||||
if namespace.Name == config.Config.MizuResourcesNamespace {
|
||||
continue
|
||||
}
|
||||
|
||||
tappedNamespaces[namespace.Name] = globalTapConfig.TappedNamespaces[namespace.Name]
|
||||
tappedNamespaces[namespace.Name] = savedTapConfig.TappedNamespaces[namespace.Name]
|
||||
}
|
||||
|
||||
tapConfig := models.TapConfig{TappedNamespaces: tappedNamespaces}
|
||||
c.JSON(http.StatusOK, tapConfig)
|
||||
tapConfigToReturn := models.TapConfig{TappedNamespaces: tappedNamespaces}
|
||||
c.JSON(http.StatusOK, tapConfigToReturn)
|
||||
}
|
||||
|
||||
func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, targetNamespaces []string, podFilterRegex regexp.Regexp, ignoredUserAgents []string, mizuApiFilteringOptions tapApi.TrafficFilteringOptions, serviceMesh bool) (*kubernetes.MizuTapperSyncer, error) {
|
||||
@@ -129,7 +133,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
|
||||
return
|
||||
}
|
||||
|
||||
providers.TapStatus = shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)}
|
||||
tappedPods.Set(kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods))
|
||||
broadcastTappedPodsStatus()
|
||||
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
|
||||
if !ok {
|
||||
@@ -137,7 +141,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
|
||||
return
|
||||
}
|
||||
|
||||
addTapperStatus(tapperStatus)
|
||||
tappersStatus.Set(&tapperStatus)
|
||||
broadcastTappedPodsStatus()
|
||||
case <-ctx.Done():
|
||||
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
|
||||
|
||||
@@ -8,43 +8,42 @@ import (
|
||||
"mizuserver/pkg/api"
|
||||
"mizuserver/pkg/holder"
|
||||
"mizuserver/pkg/providers"
|
||||
"mizuserver/pkg/providers/tappedPods"
|
||||
"mizuserver/pkg/providers/tappersCount"
|
||||
"mizuserver/pkg/providers/tappersStatus"
|
||||
"mizuserver/pkg/up9"
|
||||
"mizuserver/pkg/utils"
|
||||
"mizuserver/pkg/validation"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func HealthCheck(c *gin.Context) {
|
||||
tappers := make([]shared.TapperStatus, 0)
|
||||
for _, value := range providers.TappersStatus {
|
||||
tappers := make([]*shared.TapperStatus, 0)
|
||||
for _, value := range tappersStatus.Get() {
|
||||
tappers = append(tappers, value)
|
||||
}
|
||||
|
||||
response := shared.HealthResponse{
|
||||
TapStatus: providers.TapStatus,
|
||||
TappersCount: providers.TappersCount,
|
||||
TappedPods: tappedPods.Get(),
|
||||
TappersCount: tappersCount.Get(),
|
||||
TappersStatus: tappers,
|
||||
}
|
||||
c.JSON(http.StatusOK, response)
|
||||
}
|
||||
|
||||
func PostTappedPods(c *gin.Context) {
|
||||
tapStatus := &shared.TapStatus{}
|
||||
if err := c.Bind(tapStatus); err != nil {
|
||||
var requestTappedPods []*shared.PodInfo
|
||||
if err := c.Bind(&requestTappedPods); err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
if err := validation.Validate(tapStatus); err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
logger.Log.Infof("[Status] POST request: %d tapped pods", len(tapStatus.Pods))
|
||||
providers.TapStatus.Pods = tapStatus.Pods
|
||||
|
||||
logger.Log.Infof("[Status] POST request: %d tapped pods", len(requestTappedPods))
|
||||
tappedPods.Set(requestTappedPods)
|
||||
broadcastTappedPodsStatus()
|
||||
}
|
||||
|
||||
func broadcastTappedPodsStatus() {
|
||||
tappedPodsStatus := utils.GetTappedPodsStatus()
|
||||
tappedPodsStatus := tappedPods.GetTappedPodsStatus()
|
||||
|
||||
message := shared.CreateWebSocketStatusMessage(tappedPodsStatus)
|
||||
if jsonBytes, err := json.Marshal(message); err != nil {
|
||||
@@ -54,14 +53,6 @@ func broadcastTappedPodsStatus() {
|
||||
}
|
||||
}
|
||||
|
||||
func addTapperStatus(tapperStatus shared.TapperStatus) {
|
||||
if providers.TappersStatus == nil {
|
||||
providers.TappersStatus = make(map[string]shared.TapperStatus)
|
||||
}
|
||||
|
||||
providers.TappersStatus[tapperStatus.NodeName] = tapperStatus
|
||||
}
|
||||
|
||||
func PostTapperStatus(c *gin.Context) {
|
||||
tapperStatus := &shared.TapperStatus{}
|
||||
if err := c.Bind(tapperStatus); err != nil {
|
||||
@@ -75,12 +66,12 @@ func PostTapperStatus(c *gin.Context) {
|
||||
}
|
||||
|
||||
logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus)
|
||||
addTapperStatus(*tapperStatus)
|
||||
tappersStatus.Set(tapperStatus)
|
||||
broadcastTappedPodsStatus()
|
||||
}
|
||||
|
||||
func GetTappersCount(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, providers.TappersCount)
|
||||
c.JSON(http.StatusOK, tappersCount.Get())
|
||||
}
|
||||
|
||||
func GetAuthStatus(c *gin.Context) {
|
||||
@@ -94,7 +85,7 @@ func GetAuthStatus(c *gin.Context) {
|
||||
}
|
||||
|
||||
func GetTappingStatus(c *gin.Context) {
|
||||
tappedPodsStatus := utils.GetTappedPodsStatus()
|
||||
tappedPodsStatus := tappedPods.GetTappedPodsStatus()
|
||||
c.JSON(http.StatusOK, tappedPodsStatus)
|
||||
}
|
||||
|
||||
|
||||
@@ -8,19 +8,14 @@ import (
|
||||
"github.com/up9inc/mizu/tap"
|
||||
"mizuserver/pkg/models"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const tlsLinkRetainmentTime = time.Minute * 15
|
||||
|
||||
var (
|
||||
TappersCount int
|
||||
TapStatus shared.TapStatus
|
||||
TappersStatus map[string]shared.TapperStatus
|
||||
authStatus *models.AuthStatus
|
||||
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
|
||||
tappersCountLock = sync.Mutex{}
|
||||
authStatus *models.AuthStatus
|
||||
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
|
||||
)
|
||||
|
||||
func GetAuthStatus() (*models.AuthStatus, error) {
|
||||
@@ -68,15 +63,3 @@ func GetAllRecentTLSAddresses() []string {
|
||||
|
||||
return recentTLSLinks
|
||||
}
|
||||
|
||||
func TapperAdded() {
|
||||
tappersCountLock.Lock()
|
||||
TappersCount++
|
||||
tappersCountLock.Unlock()
|
||||
}
|
||||
|
||||
func TapperRemoved() {
|
||||
tappersCountLock.Lock()
|
||||
TappersCount--
|
||||
tappersCountLock.Unlock()
|
||||
}
|
||||
|
||||
54
agent/pkg/providers/tapConfig/tap_config_provider.go
Normal file
54
agent/pkg/providers/tapConfig/tap_config_provider.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package tapConfig
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"io/ioutil"
|
||||
"mizuserver/pkg/models"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const FilePath = shared.DataDirPath + "tap-config.json"
|
||||
|
||||
var lock = &sync.Mutex{}
|
||||
|
||||
var config *models.TapConfig
|
||||
|
||||
func Get() *models.TapConfig {
|
||||
if config == nil {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
if config == nil {
|
||||
if content, err := ioutil.ReadFile(FilePath); err != nil {
|
||||
config = &models.TapConfig{TappedNamespaces: make(map[string]bool)}
|
||||
if !os.IsNotExist(err) {
|
||||
logger.Log.Errorf("Error loading tap config from file, err: %v", err)
|
||||
}
|
||||
} else {
|
||||
if err = json.Unmarshal(content, &config); err != nil {
|
||||
config = &models.TapConfig{TappedNamespaces: make(map[string]bool)}
|
||||
logger.Log.Errorf("Error while unmarshal tap config, err: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
func Save(tapConfigToSave *models.TapConfig) {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
config = tapConfigToSave
|
||||
if data, err := json.Marshal(config); err != nil {
|
||||
logger.Log.Errorf("Error while marshal tap config, err: %v", err)
|
||||
} else {
|
||||
if err := ioutil.WriteFile(FilePath, data, 0644); err != nil {
|
||||
logger.Log.Errorf("Error writing tap config to file, err: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
32
agent/pkg/providers/tappedPods/tapped_pods_provider.go
Normal file
32
agent/pkg/providers/tappedPods/tapped_pods_provider.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package tappedPods
|
||||
|
||||
import (
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"mizuserver/pkg/providers/tappersStatus"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var tappedPods []*shared.PodInfo
|
||||
|
||||
func Get() []*shared.PodInfo {
|
||||
return tappedPods
|
||||
}
|
||||
|
||||
func Set(tappedPodsToSet []*shared.PodInfo) {
|
||||
tappedPods = tappedPodsToSet
|
||||
}
|
||||
|
||||
func GetTappedPodsStatus() []shared.TappedPodStatus {
|
||||
tappedPodsStatus := make([]shared.TappedPodStatus, 0)
|
||||
for _, pod := range Get() {
|
||||
var status string
|
||||
if tapperStatus, ok := tappersStatus.Get()[pod.NodeName]; ok {
|
||||
status = strings.ToLower(tapperStatus.Status)
|
||||
}
|
||||
|
||||
isTapped := status == "running"
|
||||
tappedPodsStatus = append(tappedPodsStatus, shared.TappedPodStatus{Name: pod.Name, Namespace: pod.Namespace, IsTapped: isTapped})
|
||||
}
|
||||
|
||||
return tappedPodsStatus
|
||||
}
|
||||
25
agent/pkg/providers/tappersCount/tappers_count_provider.go
Normal file
25
agent/pkg/providers/tappersCount/tappers_count_provider.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package tappersCount
|
||||
|
||||
import "sync"
|
||||
|
||||
var lock = &sync.Mutex{}
|
||||
|
||||
var tappersCount int
|
||||
|
||||
func Add() {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
tappersCount++
|
||||
}
|
||||
|
||||
func Remove() {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
tappersCount--
|
||||
}
|
||||
|
||||
func Get() int {
|
||||
return tappersCount
|
||||
}
|
||||
25
agent/pkg/providers/tappersStatus/tappers_status_provider.go
Normal file
25
agent/pkg/providers/tappersStatus/tappers_status_provider.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package tappersStatus
|
||||
|
||||
import "github.com/up9inc/mizu/shared"
|
||||
|
||||
var tappersStatus map[string]*shared.TapperStatus
|
||||
|
||||
func Get() map[string]*shared.TapperStatus {
|
||||
if tappersStatus == nil {
|
||||
tappersStatus = make(map[string]*shared.TapperStatus)
|
||||
}
|
||||
|
||||
return tappersStatus
|
||||
}
|
||||
|
||||
func Set(tapperStatus *shared.TapperStatus) {
|
||||
if tappersStatus == nil {
|
||||
tappersStatus = make(map[string]*shared.TapperStatus)
|
||||
}
|
||||
|
||||
tappersStatus[tapperStatus.NodeName] = tapperStatus
|
||||
}
|
||||
|
||||
func Reset() {
|
||||
tappersStatus = make(map[string]*shared.TapperStatus)
|
||||
}
|
||||
@@ -3,12 +3,10 @@ package utils
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"mizuserver/pkg/providers"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@@ -45,16 +43,6 @@ func StartServer(app *gin.Engine) {
|
||||
}
|
||||
}
|
||||
|
||||
func GetTappedPodsStatus() []shared.TappedPodStatus {
|
||||
tappedPodsStatus := make([]shared.TappedPodStatus, 0)
|
||||
for _, pod := range providers.TapStatus.Pods {
|
||||
status := strings.ToLower(providers.TappersStatus[pod.NodeName].Status)
|
||||
isTapped := status == "running"
|
||||
tappedPodsStatus = append(tappedPodsStatus, shared.TappedPodStatus{Name: pod.Name, Namespace: pod.Namespace, IsTapped: isTapped})
|
||||
}
|
||||
return tappedPodsStatus
|
||||
}
|
||||
|
||||
func CheckErr(e error) {
|
||||
if e != nil {
|
||||
logger.Log.Errorf("%v", e)
|
||||
|
||||
@@ -14,8 +14,12 @@ help: ## This help.
|
||||
install:
|
||||
go install mizu.go
|
||||
|
||||
build-debug:
|
||||
export GCLFAGS='-gcflags="all=-N -l"'
|
||||
${MAKE} build
|
||||
|
||||
build: ## Build mizu CLI binary (select platform via GOOS / GOARCH env variables).
|
||||
go build -ldflags="-X 'github.com/up9inc/mizu/cli/mizu.GitCommitHash=$(COMMIT_HASH)' \
|
||||
go build ${GCLFAGS} -ldflags="-X 'github.com/up9inc/mizu/cli/mizu.GitCommitHash=$(COMMIT_HASH)' \
|
||||
-X 'github.com/up9inc/mizu/cli/mizu.Branch=$(GIT_BRANCH)' \
|
||||
-X 'github.com/up9inc/mizu/cli/mizu.BuildTimestamp=$(BUILD_TIMESTAMP)' \
|
||||
-X 'github.com/up9inc/mizu/cli/mizu.Platform=$(SUFFIX)' \
|
||||
|
||||
@@ -87,9 +87,8 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
|
||||
tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", provider.url)
|
||||
|
||||
podInfos := kubernetes.GetPodInfosForPods(pods)
|
||||
tapStatus := shared.TapStatus{Pods: podInfos}
|
||||
|
||||
if jsonValue, err := json.Marshal(tapStatus); err != nil {
|
||||
if jsonValue, err := json.Marshal(podInfos); err != nil {
|
||||
return fmt.Errorf("failed Marshal the tapped pods %w", err)
|
||||
} else {
|
||||
if response, err := provider.client.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
|
||||
@@ -89,6 +89,8 @@ func getInstallMizuAgentConfig(maxDBSizeBytes int64, tapperResources shared.Reso
|
||||
MizuResourcesNamespace: config.Config.MizuResourcesNamespace,
|
||||
AgentDatabasePath: shared.DataDirPath,
|
||||
StandaloneMode: true,
|
||||
ServiceMap: config.Config.ServiceMap,
|
||||
OAS: config.Config.OAS,
|
||||
}
|
||||
|
||||
return &mizuAgentConfig
|
||||
@@ -99,7 +101,7 @@ func watchApiServerPodReady(ctx context.Context, kubernetesProvider *kubernetes.
|
||||
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
|
||||
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
|
||||
|
||||
timeAfter := time.After(30 * time.Second)
|
||||
timeAfter := time.After(1 * time.Minute)
|
||||
for {
|
||||
select {
|
||||
case wEvent, ok := <-eventChan:
|
||||
|
||||
@@ -156,6 +156,8 @@ func getTapMizuAgentConfig() *shared.MizuAgentConfig {
|
||||
TapperResources: config.Config.Tap.TapperResources,
|
||||
MizuResourcesNamespace: config.Config.MizuResourcesNamespace,
|
||||
AgentDatabasePath: shared.DataDirPath,
|
||||
ServiceMap: config.Config.ServiceMap,
|
||||
OAS: config.Config.OAS,
|
||||
}
|
||||
|
||||
return &mizuAgentConfig
|
||||
|
||||
@@ -34,9 +34,11 @@ type ConfigStruct struct {
|
||||
ConfigFilePath string `yaml:"config-path,omitempty" readonly:""`
|
||||
HeadlessMode bool `yaml:"headless" default:"false"`
|
||||
LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""`
|
||||
ServiceMap bool `yaml:"service-map" default:"false" readonly:""`
|
||||
OAS bool `yaml:"oas" default:"false" readonly:""`
|
||||
}
|
||||
|
||||
func(config *ConfigStruct) validate() error {
|
||||
func (config *ConfigStruct) validate() error {
|
||||
if _, err := logging.LogLevel(config.LogLevelStr); err != nil {
|
||||
return fmt.Errorf("%s is not a valid log level, err: %v", config.LogLevelStr, err)
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
for f in tap/extensions/*; do
|
||||
if [ -d "$f" ]; then
|
||||
echo Building extension: $f
|
||||
extension=$(basename $f) && \
|
||||
cd tap/extensions/${extension} && \
|
||||
go build -buildmode=plugin -o ../${extension}.so . && \
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||

|
||||
|
||||
# Service mesh mutual tls (mtls) with Mizu
|
||||
|
||||
This document describe how Mizu tapper handles workloads configured with mtls, making the internal traffic between services in a cluster to be encrypted.
|
||||
|
||||
The list of service meshes supported by Mizu include:
|
||||
@@ -7,38 +9,67 @@ The list of service meshes supported by Mizu include:
|
||||
- Istio
|
||||
- Linkerd
|
||||
|
||||
In order to create a service mesh setup for development, follow those steps:
|
||||
1. Deploy a sample application to a Kubernetes cluster, the sample application needs to make internal service to service calls
|
||||
2. SSH to one of the nodes, and run `tcpdump`
|
||||
3. Make sure you see the internal service to service calls in a plain text
|
||||
4. Deploy a service mesh (Istio, Linkerd) to the cluster - make sure it is attached to all pods of the sample application, and that it is configured with mtls (default)
|
||||
5. Run `tcpdump` again, make sure you don't see the internal service to service calls in a plain text
|
||||
## Installation
|
||||
|
||||
### Optional: Allow source IP resolving in Istio
|
||||
|
||||
When using Istio, in order to enable Mizu to reslove source IPs to names, turn on the [use_remote_address](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/headers#x-forwarded-for) option in Istio sidecar Envoys.
|
||||
This setting causes the Envoys to append to `X-Forwarded-For` request header. Mizu in turn uses the `X-Forwarded-For` header to determine the true source IPs.
|
||||
One way to turn on the `use_remote_address` HTTP connection manager option is by applying an `EnvoyFilter`:
|
||||
|
||||
```yaml
|
||||
apiVersion: networking.istio.io/v1alpha3
|
||||
kind: EnvoyFilter
|
||||
metadata:
|
||||
name: mizu-xff
|
||||
namespace: istio-system # as defined in meshConfig resource.
|
||||
spec:
|
||||
configPatches:
|
||||
- applyTo: NETWORK_FILTER
|
||||
match:
|
||||
context: SIDECAR_OUTBOUND # will match outbound listeners in all sidecars
|
||||
patch:
|
||||
operation: MERGE
|
||||
value:
|
||||
typed_config:
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager"
|
||||
use_remote_address: true
|
||||
```
|
||||
|
||||
Save the above text to `mizu-xff-envoyfilter.yaml` and run `kubectl apply -f mizu-xff-envoyfilter.yaml`.
|
||||
|
||||
With Istio, mizu does not resolve source IPs for non-HTTP traffic.
|
||||
|
||||
## Implementation
|
||||
|
||||
### Istio support
|
||||
|
||||
#### The connection between Istio and Envoy
|
||||
|
||||
In order to implement its service mesh capabilities, [Istio](https://istio.io) uses an [Envoy](https://www.envoyproxy.io) sidecar in front of every pod in the cluster. The Envoy is responsible for the mtls communication, and that's why we are focusing on Envoy proxy.
|
||||
|
||||
In the future we might see more players in that field, then we'll have to either add support for each of them or go with a unified eBPF solution.
|
||||
|
||||
#### Network namespaces
|
||||
|
||||
A [linux network namespace](https://man7.org/linux/man-pages/man7/network_namespaces.7.html) is an isolation that limit the process view of the network. In the container world it used to isolate one container from another. In the Kubernetes world it used to isolate a pod from another. That means that two containers running on the same pod share the same network namespace. A container can reach a container in the same pod by accessing `localhost`.
|
||||
|
||||
An Envoy proxy configured with mtls receives the inbound traffic directed to the pod, decrypts it and sends it via `localhost` to the target container.
|
||||
|
||||
#### Tapping mtls traffic
|
||||
In order for Mizu to be able to see the decrypted traffic it needs to listen on the same network namespace of the target pod. Multiple threads of the same process can have different network namespaces.
|
||||
|
||||
In order for Mizu to be able to see the decrypted traffic it needs to listen on the same network namespace of the target pod. Multiple threads of the same process can have different network namespaces.
|
||||
|
||||
[gopacket](https://github.com/google/gopacket) uses [libpacp](https://github.com/the-tcpdump-group/libpcap) by default for capturing the traffic. Libpacap doesn't support network namespaces and we can't ask it to listen to traffic on a different namespace. However, we can change the network namespace of the calling thread and then start libpcap to see the traffic on a different namespace.
|
||||
|
||||
#### Finding the network namespace of a running process
|
||||
|
||||
The network namespace of a running process can be found in `/proc/PID/ns/net` link. Once we have this link, we can ask Linux to change the network namespace of a thread to this one.
|
||||
|
||||
This mean that Mizu needs to have access to the `/proc` (procfs) of the running node.
|
||||
|
||||
#### Finding the network namespace of a running pod
|
||||
|
||||
In order for Mizu to be able to listen to mtls traffic, it needs to get the PIDs of the the running pods, filter them according to the user filters and then start listen to their internal network namespace traffic.
|
||||
|
||||
There is no official way in Kubernetes to get from pod to PID. The CRI implementation purposefully doesn't force a pod to be a processes on the host. It can be a Virtual Machine as well like [Kata containers](https://katacontainers.io)
|
||||
@@ -50,4 +81,15 @@ Once Mizu detects an Envoy process, it need to check whether this specific Envoy
|
||||
Istio sends an `INSTANCE_IP` environment variable to every Envoy proxy process. By examining the Envoy process's environment variables we can see whether it's relevant or not. Examining a process environment variables is done by reading the `/proc/PID/envion` file.
|
||||
|
||||
#### Edge cases
|
||||
|
||||
The method we use to find Envoy processes and correlate them to the cluster IPs may be inaccurate in certain situations. If, for example, a user runs an Envoy process manually, and set its `INSTANCE_IP` environment variable to one of the `CLUSTER_IPS` the tapper gets, then Mizu will capture traffic for it.
|
||||
|
||||
## Development
|
||||
|
||||
In order to create a service mesh setup for development, follow those steps:
|
||||
|
||||
1. Deploy a sample application to a Kubernetes cluster, the sample application needs to make internal service to service calls
|
||||
2. SSH to one of the nodes, and run `tcpdump`
|
||||
3. Make sure you see the internal service to service calls in a plain text
|
||||
4. Deploy a service mesh (Istio, Linkerd) to the cluster - make sure it is attached to all pods of the sample application, and that it is configured with mtls (default)
|
||||
5. Run `tcpdump` again, make sure you don't see the internal service to service calls in a plain text
|
||||
|
||||
@@ -73,10 +73,10 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
|
||||
return missingPods
|
||||
}
|
||||
|
||||
func GetPodInfosForPods(pods []core.Pod) []shared.PodInfo {
|
||||
podInfos := make([]shared.PodInfo, 0)
|
||||
func GetPodInfosForPods(pods []core.Pod) []*shared.PodInfo {
|
||||
podInfos := make([]*shared.PodInfo, 0)
|
||||
for _, pod := range pods {
|
||||
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace, NodeName: pod.Spec.NodeName})
|
||||
podInfos = append(podInfos, &shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace, NodeName: pod.Spec.NodeName})
|
||||
}
|
||||
return podInfos
|
||||
}
|
||||
|
||||
@@ -41,6 +41,8 @@ type MizuAgentConfig struct {
|
||||
MizuResourcesNamespace string `json:"mizuResourceNamespace"`
|
||||
AgentDatabasePath string `json:"agentDatabasePath"`
|
||||
StandaloneMode bool `json:"standaloneMode"`
|
||||
ServiceMap bool `json:"serviceMap"`
|
||||
OAS bool `json:"oas"`
|
||||
}
|
||||
|
||||
type WebSocketMessageMetadata struct {
|
||||
@@ -81,10 +83,6 @@ type TappedPodStatus struct {
|
||||
IsTapped bool `json:"isTapped"`
|
||||
}
|
||||
|
||||
type TapStatus struct {
|
||||
Pods []PodInfo `json:"pods"`
|
||||
}
|
||||
|
||||
type PodInfo struct {
|
||||
Namespace string `json:"namespace"`
|
||||
Name string `json:"name"`
|
||||
@@ -124,9 +122,9 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc
|
||||
}
|
||||
|
||||
type HealthResponse struct {
|
||||
TapStatus TapStatus `json:"tapStatus"`
|
||||
TappersCount int `json:"tappersCount"`
|
||||
TappersStatus []TapperStatus `json:"tappersStatus"`
|
||||
TappedPods []*PodInfo `json:"tappedPods"`
|
||||
TappersCount int `json:"tappersCount"`
|
||||
TappersStatus []*TapperStatus `json:"tappersStatus"`
|
||||
}
|
||||
|
||||
type VersionResponse struct {
|
||||
|
||||
@@ -21,9 +21,32 @@ func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *ap
|
||||
FilterSensitiveData(item, options)
|
||||
}
|
||||
|
||||
replaceForwardedFor(item)
|
||||
|
||||
emitter.Emit(item)
|
||||
}
|
||||
|
||||
func replaceForwardedFor(item *api.OutputChannelItem) {
|
||||
if item.Protocol.Name != "http" {
|
||||
return
|
||||
}
|
||||
|
||||
request := item.Pair.Request.Payload.(api.HTTPPayload).Data.(*http.Request)
|
||||
|
||||
forwardedFor := request.Header.Get("X-Forwarded-For")
|
||||
if forwardedFor == "" {
|
||||
return
|
||||
}
|
||||
|
||||
ips := strings.Split(forwardedFor, ",")
|
||||
lastIP := strings.TrimSpace(ips[0])
|
||||
|
||||
item.ConnectionInfo.ClientIP = lastIP
|
||||
// Erase the port field. Because the proxy terminates the connection from the client, the port that we see here
|
||||
// is not the source port on the client side.
|
||||
item.ConnectionInfo.ClientPort = ""
|
||||
}
|
||||
|
||||
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
|
||||
if err != nil {
|
||||
|
||||
@@ -216,7 +216,7 @@ export const EntryItem: React.FC<EntryProps> = ({entry, focusedEntryId, setFocus
|
||||
{entry.src.ip}
|
||||
</span>
|
||||
</Queryable>
|
||||
<span className={`${styles.tcpInfo}`} style={{marginTop: "18px"}}>:</span>
|
||||
<span className={`${styles.tcpInfo}`} style={{marginTop: "18px"}}>{entry.src.port ? ":" : ""}</span>
|
||||
<Queryable
|
||||
query={`src.port == "${entry.src.port}"`}
|
||||
updateQuery={updateQuery}
|
||||
|
||||
Reference in New Issue
Block a user