mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-03-17 00:51:06 +00:00
Compare commits
3 Commits
26.0-dev22
...
26.0-dev25
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e62eaf4de | ||
|
|
81e830dd18 | ||
|
|
88e3fba1ef |
@@ -34,9 +34,12 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
||||
tappers.Connected()
|
||||
} else {
|
||||
logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
|
||||
|
||||
socketListLock.Lock()
|
||||
browserClientSocketUUIDs = append(browserClientSocketUUIDs, socketId)
|
||||
socketListLock.Unlock()
|
||||
|
||||
BroadcastTappedPodsStatus()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
20
agent/pkg/api/utils.go
Normal file
20
agent/pkg/api/utils.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/providers/tappedPods"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
func BroadcastTappedPodsStatus() {
|
||||
tappedPodsStatus := tappedPods.GetTappedPodsStatus()
|
||||
|
||||
message := shared.CreateWebSocketStatusMessage(tappedPodsStatus)
|
||||
if jsonBytes, err := json.Marshal(message); err != nil {
|
||||
logger.Log.Errorf("Could not Marshal message %v", err)
|
||||
} else {
|
||||
BroadcastToBrowserClients(jsonBytes)
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/api"
|
||||
"github.com/up9inc/mizu/agent/pkg/config"
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
"github.com/up9inc/mizu/agent/pkg/providers"
|
||||
@@ -36,7 +37,7 @@ func PostTapConfig(c *gin.Context) {
|
||||
tappedPods.Set([]*shared.PodInfo{})
|
||||
tappers.ResetStatus()
|
||||
|
||||
broadcastTappedPodsStatus()
|
||||
api.BroadcastTappedPodsStatus()
|
||||
}
|
||||
|
||||
var tappedNamespaces []string
|
||||
@@ -135,7 +136,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
|
||||
}
|
||||
|
||||
tappedPods.Set(kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods))
|
||||
broadcastTappedPodsStatus()
|
||||
api.BroadcastTappedPodsStatus()
|
||||
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
|
||||
if !ok {
|
||||
logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop")
|
||||
@@ -143,7 +144,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
|
||||
}
|
||||
|
||||
tappers.SetStatus(&tapperStatus)
|
||||
broadcastTappedPodsStatus()
|
||||
api.BroadcastTappedPodsStatus()
|
||||
case <-ctx.Done():
|
||||
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
|
||||
return
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -39,18 +38,7 @@ func PostTappedPods(c *gin.Context) {
|
||||
|
||||
logger.Log.Infof("[Status] POST request: %d tapped pods", len(requestTappedPods))
|
||||
tappedPods.Set(requestTappedPods)
|
||||
broadcastTappedPodsStatus()
|
||||
}
|
||||
|
||||
func broadcastTappedPodsStatus() {
|
||||
tappedPodsStatus := tappedPods.GetTappedPodsStatus()
|
||||
|
||||
message := shared.CreateWebSocketStatusMessage(tappedPodsStatus)
|
||||
if jsonBytes, err := json.Marshal(message); err != nil {
|
||||
logger.Log.Errorf("Could not Marshal message %v", err)
|
||||
} else {
|
||||
api.BroadcastToBrowserClients(jsonBytes)
|
||||
}
|
||||
api.BroadcastTappedPodsStatus()
|
||||
}
|
||||
|
||||
func PostTapperStatus(c *gin.Context) {
|
||||
@@ -67,7 +55,7 @@ func PostTapperStatus(c *gin.Context) {
|
||||
|
||||
logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus)
|
||||
tappers.SetStatus(tapperStatus)
|
||||
broadcastTappedPodsStatus()
|
||||
api.BroadcastTappedPodsStatus()
|
||||
}
|
||||
|
||||
func GetConnectedTappersCount(c *gin.Context) {
|
||||
|
||||
@@ -31,7 +31,7 @@ func runMizuCheck() {
|
||||
}
|
||||
|
||||
if checkPassed {
|
||||
checkPassed = checkAllResourcesExist(ctx, kubernetesProvider, isInstallCommand)
|
||||
checkPassed = checkK8sResources(ctx, kubernetesProvider, isInstallCommand)
|
||||
}
|
||||
|
||||
if checkPassed {
|
||||
@@ -66,7 +66,7 @@ func checkKubernetesApi() (*kubernetes.Provider, *semver.SemVersion, bool) {
|
||||
}
|
||||
|
||||
func checkMizuMode(ctx context.Context, kubernetesProvider *kubernetes.Provider) (bool, bool) {
|
||||
logger.Log.Infof("\nmizu-mode\n--------------------")
|
||||
logger.Log.Infof("\nmode\n--------------------")
|
||||
|
||||
if exist, err := kubernetesProvider.DoesDeploymentExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
|
||||
logger.Log.Errorf("%v can't check mizu command, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
|
||||
@@ -79,7 +79,7 @@ func checkMizuMode(ctx context.Context, kubernetesProvider *kubernetes.Provider)
|
||||
return false, false
|
||||
} else if exist {
|
||||
logger.Log.Infof("%v mizu running with tap command", fmt.Sprintf(uiUtils.Green, "√"))
|
||||
return true, true
|
||||
return true, false
|
||||
} else {
|
||||
logger.Log.Infof("%v mizu is not running", fmt.Sprintf(uiUtils.Red, "✗"))
|
||||
return false, false
|
||||
@@ -99,9 +99,9 @@ func checkKubernetesVersion(kubernetesVersion *semver.SemVersion) bool {
|
||||
}
|
||||
|
||||
func checkServerConnection(kubernetesProvider *kubernetes.Provider) bool {
|
||||
logger.Log.Infof("\nmizu-connectivity\n--------------------")
|
||||
logger.Log.Infof("\nAPI-server-connectivity\n--------------------")
|
||||
|
||||
serverUrl := GetApiServerUrl()
|
||||
serverUrl := GetApiServerUrl(config.Config.Tap.GuiPort)
|
||||
|
||||
apiServerProvider := apiserver.NewProvider(serverUrl, 1, apiserver.DefaultTimeout)
|
||||
if err := apiServerProvider.TestConnection(); err == nil {
|
||||
@@ -169,8 +169,8 @@ func checkPortForward(serverUrl string, kubernetesProvider *kubernetes.Provider)
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkAllResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.Provider, isInstallCommand bool) bool {
|
||||
logger.Log.Infof("\nmizu-existence\n--------------------")
|
||||
func checkK8sResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isInstallCommand bool) bool {
|
||||
logger.Log.Infof("\nk8s-components\n--------------------")
|
||||
|
||||
exist, err := kubernetesProvider.DoesNamespaceExist(ctx, config.Config.MizuResourcesNamespace)
|
||||
allResourcesExist := checkResourceExist(config.Config.MizuResourcesNamespace, "namespace", exist, err)
|
||||
@@ -227,7 +227,43 @@ func checkTapResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.
|
||||
exist, err := kubernetesProvider.DoesPodExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName)
|
||||
tapResourcesExist := checkResourceExist(kubernetes.ApiServerPodName, "pod", exist, err)
|
||||
|
||||
return tapResourcesExist
|
||||
if !tapResourcesExist {
|
||||
return false
|
||||
}
|
||||
|
||||
if pod, err := kubernetesProvider.GetPod(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
|
||||
logger.Log.Errorf("%v error checking if '%v' pod exists, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName, err)
|
||||
return false
|
||||
} else if kubernetes.IsPodRunning(pod) {
|
||||
logger.Log.Infof("%v '%v' pod running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.ApiServerPodName)
|
||||
} else {
|
||||
logger.Log.Errorf("%v '%v' pod not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName)
|
||||
return false
|
||||
}
|
||||
|
||||
tapperRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperPodName))
|
||||
if pods, err := kubernetesProvider.ListAllPodsMatchingRegex(ctx, tapperRegex, []string{config.Config.MizuResourcesNamespace}); err != nil {
|
||||
logger.Log.Errorf("%v error listing '%v' pods, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, err)
|
||||
return false
|
||||
} else {
|
||||
tappers := 0
|
||||
notRunningTappers := 0
|
||||
|
||||
for _, pod := range pods {
|
||||
tappers += 1
|
||||
if !kubernetes.IsPodRunning(&pod) {
|
||||
notRunningTappers += 1
|
||||
}
|
||||
}
|
||||
|
||||
if tappers != notRunningTappers {
|
||||
logger.Log.Errorf("%v '%v' %v/%v pods are not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, notRunningTappers, tappers)
|
||||
return false
|
||||
}
|
||||
|
||||
logger.Log.Infof("%v '%v' %v pods running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.TapperPodName, tappers)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func checkResourceExist(resourceName string, resourceType string, exist bool, err error) bool {
|
||||
|
||||
@@ -23,12 +23,12 @@ import (
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
func GetApiServerUrl() string {
|
||||
return fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort))
|
||||
func GetApiServerUrl(port uint16) string {
|
||||
return fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(port))
|
||||
}
|
||||
|
||||
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc) {
|
||||
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel)
|
||||
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc, port uint16) {
|
||||
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, port, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel)
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
|
||||
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
|
||||
@@ -36,7 +36,7 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
|
||||
return
|
||||
}
|
||||
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := apiProvider.TestConnection(); err != nil {
|
||||
logger.Log.Debugf("Couldn't connect using proxy, stopping proxy and trying to create port-forward")
|
||||
if err := httpServer.Shutdown(ctx); err != nil {
|
||||
@@ -44,14 +44,14 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
|
||||
}
|
||||
|
||||
podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName)
|
||||
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, config.Config.Tap.GuiPort, ctx, cancel); err != nil {
|
||||
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, port, ctx, cancel); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running port forward %v\n"+
|
||||
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := apiProvider.TestConnection(); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
||||
cancel()
|
||||
|
||||
@@ -45,7 +45,7 @@ var apiProvider *apiserver.Provider
|
||||
func RunMizuTap() {
|
||||
state.startTime = time.Now()
|
||||
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(config.Config.Tap.GuiPort), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
|
||||
var err error
|
||||
var serializedValidationRules string
|
||||
@@ -421,7 +421,7 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr
|
||||
}
|
||||
|
||||
func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel)
|
||||
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, config.Config.Tap.GuiPort)
|
||||
|
||||
options, _ := getMizuApiFilteringOptions()
|
||||
if err := startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, *options, state.startTime); err != nil {
|
||||
@@ -429,7 +429,7 @@ func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Pr
|
||||
cancel()
|
||||
}
|
||||
|
||||
url := GetApiServerUrl()
|
||||
url := GetApiServerUrl(config.Config.Tap.GuiPort)
|
||||
logger.Log.Infof("Mizu is available at %s", url)
|
||||
if !config.Config.HeadlessMode {
|
||||
uiUtils.OpenBrowser(url)
|
||||
|
||||
@@ -39,7 +39,7 @@ func runMizuView() {
|
||||
return
|
||||
}
|
||||
|
||||
url = GetApiServerUrl()
|
||||
url = GetApiServerUrl(config.Config.View.GuiPort)
|
||||
|
||||
response, err := http.Get(fmt.Sprintf("%s/", url))
|
||||
if err == nil && response.StatusCode == 200 {
|
||||
@@ -47,7 +47,7 @@ func runMizuView() {
|
||||
return
|
||||
}
|
||||
logger.Log.Infof("Establishing connection to k8s cluster...")
|
||||
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel)
|
||||
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, config.Config.View.GuiPort)
|
||||
}
|
||||
|
||||
apiServerProvider := apiserver.NewProvider(url, apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
|
||||
@@ -1000,7 +1000,7 @@ func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, r
|
||||
|
||||
matchingPods := make([]core.Pod, 0)
|
||||
for _, pod := range pods {
|
||||
if isPodRunning(&pod) {
|
||||
if IsPodRunning(&pod) {
|
||||
matchingPods = append(matchingPods, pod)
|
||||
}
|
||||
}
|
||||
@@ -1190,6 +1190,6 @@ func loadKubernetesConfiguration(kubeConfigPath string) clientcmd.ClientConfig {
|
||||
)
|
||||
}
|
||||
|
||||
func isPodRunning(pod *core.Pod) bool {
|
||||
func IsPodRunning(pod *core.Pod) bool {
|
||||
return pod.Status.Phase == core.PodRunning
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user