mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-15 02:19:54 +00:00
Compare commits
20 Commits
26.0-dev21
...
28.0-dev1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5dfa94d76e | ||
|
|
dfe63f2318 | ||
|
|
9cf64a43f5 | ||
|
|
bf2362d836 | ||
|
|
1c11523d9d | ||
|
|
150a87413d | ||
|
|
f7221a7355 | ||
|
|
4197ec198c | ||
|
|
5484b7c491 | ||
|
|
041223b558 | ||
|
|
71c04d20ef | ||
|
|
8852bac77b | ||
|
|
59d21e19b7 | ||
|
|
884cb791fc | ||
|
|
cb332cedd4 | ||
|
|
391af95fb5 | ||
|
|
9e62eaf4de | ||
|
|
81e830dd18 | ||
|
|
88e3fba1ef | ||
|
|
a4066da5d7 |
@@ -78,8 +78,8 @@ RUN go build -ldflags="-extldflags=-static -s -w \
|
||||
-X 'github.com/up9inc/mizu/agent/pkg/version.Ver=${VER}'" -o mizuagent .
|
||||
|
||||
# Download Basenine executable, verify the sha1sum
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.4.14/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.4.14/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.4.16/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.4.16/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
|
||||
RUN shasum -a 256 -c basenine_linux_${GOARCH}.sha256
|
||||
RUN chmod +x ./basenine_linux_${GOARCH}
|
||||
RUN mv ./basenine_linux_${GOARCH} ./basenine
|
||||
|
||||
@@ -118,8 +118,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
|
||||
for item := range outputItems {
|
||||
extension := extensionsMap[item.Protocol.Name]
|
||||
resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo)
|
||||
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation)
|
||||
resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo)
|
||||
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace)
|
||||
if extension.Protocol.Name == "http" {
|
||||
if !disableOASValidation {
|
||||
var httpPair tapApi.HTTPRequestResponsePair
|
||||
@@ -158,26 +158,32 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
}
|
||||
}
|
||||
|
||||
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string) {
|
||||
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string, namespace string) {
|
||||
if k8sResolver != nil {
|
||||
unresolvedSource := connectionInfo.ClientIP
|
||||
resolvedSource = k8sResolver.Resolve(unresolvedSource)
|
||||
if resolvedSource == "" {
|
||||
resolvedSourceObject := k8sResolver.Resolve(unresolvedSource)
|
||||
if resolvedSourceObject == nil {
|
||||
logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource)
|
||||
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
resolvedSource = resolvedSourceObject.FullAddress
|
||||
}
|
||||
|
||||
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
|
||||
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
|
||||
if resolvedDestination == "" {
|
||||
resolvedDestinationObject := k8sResolver.Resolve(unresolvedDestination)
|
||||
if resolvedDestinationObject == nil {
|
||||
logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination)
|
||||
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
resolvedDestination = resolvedDestinationObject.FullAddress
|
||||
namespace = resolvedDestinationObject.Namespace
|
||||
}
|
||||
}
|
||||
return resolvedSource, resolvedDestination
|
||||
return resolvedSource, resolvedDestination, namespace
|
||||
}
|
||||
|
||||
func CheckIsServiceIP(address string) bool {
|
||||
|
||||
@@ -34,9 +34,12 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
||||
tappers.Connected()
|
||||
} else {
|
||||
logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
|
||||
|
||||
socketListLock.Lock()
|
||||
browserClientSocketUUIDs = append(browserClientSocketUUIDs, socketId)
|
||||
socketListLock.Unlock()
|
||||
|
||||
BroadcastTappedPodsStatus()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,9 +104,9 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
|
||||
}
|
||||
|
||||
func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
|
||||
resolvedName := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
|
||||
if resolvedName != "" {
|
||||
outboundLinkMessage.Data.DstIP = resolvedName
|
||||
resolvedNameObject := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
|
||||
if resolvedNameObject != nil {
|
||||
outboundLinkMessage.Data.DstIP = resolvedNameObject.FullAddress
|
||||
} else if outboundLinkMessage.Data.SuggestedResolvedName != "" {
|
||||
outboundLinkMessage.Data.DstIP = outboundLinkMessage.Data.SuggestedResolvedName
|
||||
}
|
||||
|
||||
20
agent/pkg/api/utils.go
Normal file
20
agent/pkg/api/utils.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/providers/tappedPods"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
func BroadcastTappedPodsStatus() {
|
||||
tappedPodsStatus := tappedPods.GetTappedPodsStatus()
|
||||
|
||||
message := shared.CreateWebSocketStatusMessage(tappedPodsStatus)
|
||||
if jsonBytes, err := json.Marshal(message); err != nil {
|
||||
logger.Log.Errorf("Could not Marshal message %v", err)
|
||||
} else {
|
||||
BroadcastToBrowserClients(jsonBytes)
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/api"
|
||||
"github.com/up9inc/mizu/agent/pkg/config"
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
"github.com/up9inc/mizu/agent/pkg/providers"
|
||||
@@ -36,7 +37,7 @@ func PostTapConfig(c *gin.Context) {
|
||||
tappedPods.Set([]*shared.PodInfo{})
|
||||
tappers.ResetStatus()
|
||||
|
||||
broadcastTappedPodsStatus()
|
||||
api.BroadcastTappedPodsStatus()
|
||||
}
|
||||
|
||||
var tappedNamespaces []string
|
||||
@@ -135,7 +136,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
|
||||
}
|
||||
|
||||
tappedPods.Set(kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods))
|
||||
broadcastTappedPodsStatus()
|
||||
api.BroadcastTappedPodsStatus()
|
||||
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
|
||||
if !ok {
|
||||
logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop")
|
||||
@@ -143,7 +144,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
|
||||
}
|
||||
|
||||
tappers.SetStatus(&tapperStatus)
|
||||
broadcastTappedPodsStatus()
|
||||
api.BroadcastTappedPodsStatus()
|
||||
case <-ctx.Done():
|
||||
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
|
||||
return
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -39,18 +38,7 @@ func PostTappedPods(c *gin.Context) {
|
||||
|
||||
logger.Log.Infof("[Status] POST request: %d tapped pods", len(requestTappedPods))
|
||||
tappedPods.Set(requestTappedPods)
|
||||
broadcastTappedPodsStatus()
|
||||
}
|
||||
|
||||
func broadcastTappedPodsStatus() {
|
||||
tappedPodsStatus := tappedPods.GetTappedPodsStatus()
|
||||
|
||||
message := shared.CreateWebSocketStatusMessage(tappedPodsStatus)
|
||||
if jsonBytes, err := json.Marshal(message); err != nil {
|
||||
logger.Log.Errorf("Could not Marshal message %v", err)
|
||||
} else {
|
||||
api.BroadcastToBrowserClients(jsonBytes)
|
||||
}
|
||||
api.BroadcastTappedPodsStatus()
|
||||
}
|
||||
|
||||
func PostTapperStatus(c *gin.Context) {
|
||||
@@ -67,7 +55,7 @@ func PostTapperStatus(c *gin.Context) {
|
||||
|
||||
logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus)
|
||||
tappers.SetStatus(tapperStatus)
|
||||
broadcastTappedPodsStatus()
|
||||
api.BroadcastTappedPodsStatus()
|
||||
}
|
||||
|
||||
func GetConnectedTappersCount(c *gin.Context) {
|
||||
|
||||
@@ -7,7 +7,7 @@ func CORSMiddleware() gin.HandlerFunc {
|
||||
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
c.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
|
||||
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With, x-session-token")
|
||||
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT")
|
||||
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT, DELETE")
|
||||
|
||||
if c.Request.Method == "OPTIONS" {
|
||||
c.AbortWithStatus(204)
|
||||
|
||||
@@ -30,6 +30,11 @@ type Resolver struct {
|
||||
namespace string
|
||||
}
|
||||
|
||||
type ResolvedObjectInfo struct {
|
||||
FullAddress string
|
||||
Namespace string
|
||||
}
|
||||
|
||||
func (resolver *Resolver) Start(ctx context.Context) {
|
||||
if !resolver.isStarted {
|
||||
resolver.isStarted = true
|
||||
@@ -40,12 +45,12 @@ func (resolver *Resolver) Start(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (resolver *Resolver) Resolve(name string) string {
|
||||
func (resolver *Resolver) Resolve(name string) *ResolvedObjectInfo {
|
||||
resolvedName, isFound := resolver.nameMap.Get(name)
|
||||
if !isFound {
|
||||
return ""
|
||||
return nil
|
||||
}
|
||||
return resolvedName.(string)
|
||||
return resolvedName.(*ResolvedObjectInfo)
|
||||
}
|
||||
|
||||
func (resolver *Resolver) GetMap() cmap.ConcurrentMap {
|
||||
@@ -71,7 +76,7 @@ func (resolver *Resolver) watchPods(ctx context.Context) error {
|
||||
}
|
||||
if event.Type == watch.Deleted {
|
||||
pod := event.Object.(*corev1.Pod)
|
||||
resolver.saveResolvedName(pod.Status.PodIP, "", event.Type)
|
||||
resolver.saveResolvedName(pod.Status.PodIP, "", pod.Namespace, event.Type)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
watcher.Stop()
|
||||
@@ -106,10 +111,10 @@ func (resolver *Resolver) watchEndpoints(ctx context.Context) error {
|
||||
}
|
||||
if subset.Addresses != nil {
|
||||
for _, address := range subset.Addresses {
|
||||
resolver.saveResolvedName(address.IP, serviceHostname, event.Type)
|
||||
resolver.saveResolvedName(address.IP, serviceHostname, endpoint.Namespace, event.Type)
|
||||
for _, port := range ports {
|
||||
ipWithPort := fmt.Sprintf("%s:%d", address.IP, port)
|
||||
resolver.saveResolvedName(ipWithPort, serviceHostname, event.Type)
|
||||
resolver.saveResolvedName(ipWithPort, serviceHostname, endpoint.Namespace, event.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -139,19 +144,19 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
service := event.Object.(*corev1.Service)
|
||||
serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace)
|
||||
if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString {
|
||||
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type)
|
||||
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
|
||||
if service.Spec.Ports != nil {
|
||||
for _, port := range service.Spec.Ports {
|
||||
if port.Port > 0 {
|
||||
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, event.Type)
|
||||
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, service.Namespace, event.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type)
|
||||
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
|
||||
}
|
||||
if service.Status.LoadBalancer.Ingress != nil {
|
||||
for _, ingress := range service.Status.LoadBalancer.Ingress {
|
||||
resolver.saveResolvedName(ingress.IP, serviceHostname, event.Type)
|
||||
resolver.saveResolvedName(ingress.IP, serviceHostname, service.Namespace, event.Type)
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
@@ -161,21 +166,22 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
|
||||
func (resolver *Resolver) saveResolvedName(key string, resolved string, namespace string, eventType watch.EventType) {
|
||||
if eventType == watch.Deleted {
|
||||
resolver.nameMap.Remove(key)
|
||||
logger.Log.Infof("setting %s=nil", key)
|
||||
} else {
|
||||
resolver.nameMap.Set(key, resolved)
|
||||
|
||||
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
|
||||
logger.Log.Infof("setting %s=%s", key, resolved)
|
||||
}
|
||||
}
|
||||
|
||||
func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) {
|
||||
func (resolver *Resolver) saveServiceIP(key string, resolved string, namespace string, eventType watch.EventType) {
|
||||
if eventType == watch.Deleted {
|
||||
resolver.serviceMap.Remove(key)
|
||||
} else {
|
||||
resolver.serviceMap.Set(key, resolved)
|
||||
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Mizu release _VER_
|
||||
Full changelog for stable release see in [docs](https://github.com/up9inc/mizu/blob/main/docs/CHANGELOG.md)
|
||||
Mizu CHANGELOG is now part of [Mizu wiki](https://github.com/up9inc/mizu/wiki/CHANGELOG)
|
||||
|
||||
## Download Mizu for your platform
|
||||
|
||||
|
||||
@@ -4,9 +4,11 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/shared/kubernetes"
|
||||
@@ -57,10 +59,8 @@ func (provider *Provider) TestConnection() error {
|
||||
|
||||
func (provider *Provider) isReachable() (bool, error) {
|
||||
echoUrl := fmt.Sprintf("%s/echo", provider.url)
|
||||
if response, err := provider.client.Get(echoUrl); err != nil {
|
||||
if _, err := provider.get(echoUrl); err != nil {
|
||||
return false, err
|
||||
} else if response.StatusCode != 200 {
|
||||
return false, fmt.Errorf("invalid status code %v", response.StatusCode)
|
||||
} else {
|
||||
return true, nil
|
||||
}
|
||||
@@ -72,10 +72,8 @@ func (provider *Provider) ReportTapperStatus(tapperStatus shared.TapperStatus) e
|
||||
if jsonValue, err := json.Marshal(tapperStatus); err != nil {
|
||||
return fmt.Errorf("failed Marshal the tapper status %w", err)
|
||||
} else {
|
||||
if response, err := provider.client.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
if _, err := provider.post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
|
||||
} else if response.StatusCode != 200 {
|
||||
return fmt.Errorf("failed sending to API server the tapper status, response status code %v", response.StatusCode)
|
||||
} else {
|
||||
logger.Log.Debugf("Reported to server API about tapper status: %v", tapperStatus)
|
||||
return nil
|
||||
@@ -91,10 +89,8 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
|
||||
if jsonValue, err := json.Marshal(podInfos); err != nil {
|
||||
return fmt.Errorf("failed Marshal the tapped pods %w", err)
|
||||
} else {
|
||||
if response, err := provider.client.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
if _, err := provider.post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
|
||||
} else if response.StatusCode != 200 {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods, response status code %v", response.StatusCode)
|
||||
} else {
|
||||
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
|
||||
return nil
|
||||
@@ -105,11 +101,9 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
|
||||
func (provider *Provider) GetGeneralStats() (map[string]interface{}, error) {
|
||||
generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url)
|
||||
|
||||
response, requestErr := provider.client.Get(generalStatsUrl)
|
||||
response, requestErr := provider.get(generalStatsUrl)
|
||||
if requestErr != nil {
|
||||
return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr)
|
||||
} else if response.StatusCode != 200 {
|
||||
return nil, fmt.Errorf("failed to get general stats for telemetry, status code: %v", response.StatusCode)
|
||||
}
|
||||
|
||||
defer response.Body.Close()
|
||||
@@ -132,7 +126,7 @@ func (provider *Provider) GetVersion() (string, error) {
|
||||
Method: http.MethodGet,
|
||||
URL: versionUrl,
|
||||
}
|
||||
statusResp, err := provider.client.Do(req)
|
||||
statusResp, err := provider.do(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -145,3 +139,40 @@ func (provider *Provider) GetVersion() (string, error) {
|
||||
|
||||
return versionResponse.Ver, nil
|
||||
}
|
||||
|
||||
// When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func (provider *Provider) get(url string) (*http.Response, error) {
|
||||
return provider.checkError(provider.client.Get(url))
|
||||
}
|
||||
|
||||
// When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func (provider *Provider) post(url, contentType string, body io.Reader) (*http.Response, error) {
|
||||
return provider.checkError(provider.client.Post(url, contentType, body))
|
||||
}
|
||||
|
||||
// When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func (provider *Provider) do(req *http.Request) (*http.Response, error) {
|
||||
return provider.checkError(provider.client.Do(req))
|
||||
}
|
||||
|
||||
func (provider *Provider) checkError(response *http.Response, errInOperation error) (*http.Response, error) {
|
||||
if (errInOperation != nil) {
|
||||
return response, errInOperation
|
||||
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
|
||||
} else if response.StatusCode != http.StatusOK {
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
response.Body.Close()
|
||||
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||
if err != nil {
|
||||
return response, err
|
||||
}
|
||||
|
||||
errorMsg := strings.ReplaceAll((string(body)), "\n", ";")
|
||||
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func runMizuCheck() {
|
||||
}
|
||||
|
||||
if checkPassed {
|
||||
checkPassed = checkAllResourcesExist(ctx, kubernetesProvider, isInstallCommand)
|
||||
checkPassed = checkK8sResources(ctx, kubernetesProvider, isInstallCommand)
|
||||
}
|
||||
|
||||
if checkPassed {
|
||||
@@ -66,7 +66,7 @@ func checkKubernetesApi() (*kubernetes.Provider, *semver.SemVersion, bool) {
|
||||
}
|
||||
|
||||
func checkMizuMode(ctx context.Context, kubernetesProvider *kubernetes.Provider) (bool, bool) {
|
||||
logger.Log.Infof("\nmizu-mode\n--------------------")
|
||||
logger.Log.Infof("\nmode\n--------------------")
|
||||
|
||||
if exist, err := kubernetesProvider.DoesDeploymentExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
|
||||
logger.Log.Errorf("%v can't check mizu command, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
|
||||
@@ -79,7 +79,7 @@ func checkMizuMode(ctx context.Context, kubernetesProvider *kubernetes.Provider)
|
||||
return false, false
|
||||
} else if exist {
|
||||
logger.Log.Infof("%v mizu running with tap command", fmt.Sprintf(uiUtils.Green, "√"))
|
||||
return true, true
|
||||
return true, false
|
||||
} else {
|
||||
logger.Log.Infof("%v mizu is not running", fmt.Sprintf(uiUtils.Red, "✗"))
|
||||
return false, false
|
||||
@@ -99,9 +99,9 @@ func checkKubernetesVersion(kubernetesVersion *semver.SemVersion) bool {
|
||||
}
|
||||
|
||||
func checkServerConnection(kubernetesProvider *kubernetes.Provider) bool {
|
||||
logger.Log.Infof("\nmizu-connectivity\n--------------------")
|
||||
logger.Log.Infof("\nAPI-server-connectivity\n--------------------")
|
||||
|
||||
serverUrl := GetApiServerUrl()
|
||||
serverUrl := GetApiServerUrl(config.Config.Tap.GuiPort)
|
||||
|
||||
apiServerProvider := apiserver.NewProvider(serverUrl, 1, apiserver.DefaultTimeout)
|
||||
if err := apiServerProvider.TestConnection(); err == nil {
|
||||
@@ -169,8 +169,8 @@ func checkPortForward(serverUrl string, kubernetesProvider *kubernetes.Provider)
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkAllResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.Provider, isInstallCommand bool) bool {
|
||||
logger.Log.Infof("\nmizu-existence\n--------------------")
|
||||
func checkK8sResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isInstallCommand bool) bool {
|
||||
logger.Log.Infof("\nk8s-components\n--------------------")
|
||||
|
||||
exist, err := kubernetesProvider.DoesNamespaceExist(ctx, config.Config.MizuResourcesNamespace)
|
||||
allResourcesExist := checkResourceExist(config.Config.MizuResourcesNamespace, "namespace", exist, err)
|
||||
@@ -227,7 +227,43 @@ func checkTapResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.
|
||||
exist, err := kubernetesProvider.DoesPodExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName)
|
||||
tapResourcesExist := checkResourceExist(kubernetes.ApiServerPodName, "pod", exist, err)
|
||||
|
||||
return tapResourcesExist
|
||||
if !tapResourcesExist {
|
||||
return false
|
||||
}
|
||||
|
||||
if pod, err := kubernetesProvider.GetPod(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
|
||||
logger.Log.Errorf("%v error checking if '%v' pod exists, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName, err)
|
||||
return false
|
||||
} else if kubernetes.IsPodRunning(pod) {
|
||||
logger.Log.Infof("%v '%v' pod running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.ApiServerPodName)
|
||||
} else {
|
||||
logger.Log.Errorf("%v '%v' pod not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName)
|
||||
return false
|
||||
}
|
||||
|
||||
tapperRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperPodName))
|
||||
if pods, err := kubernetesProvider.ListAllPodsMatchingRegex(ctx, tapperRegex, []string{config.Config.MizuResourcesNamespace}); err != nil {
|
||||
logger.Log.Errorf("%v error listing '%v' pods, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, err)
|
||||
return false
|
||||
} else {
|
||||
tappers := 0
|
||||
notRunningTappers := 0
|
||||
|
||||
for _, pod := range pods {
|
||||
tappers += 1
|
||||
if !kubernetes.IsPodRunning(&pod) {
|
||||
notRunningTappers += 1
|
||||
}
|
||||
}
|
||||
|
||||
if notRunningTappers > 0 {
|
||||
logger.Log.Errorf("%v '%v' %v/%v pods are not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, notRunningTappers, tappers)
|
||||
return false
|
||||
}
|
||||
|
||||
logger.Log.Infof("%v '%v' %v pods running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.TapperPodName, tappers)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func checkResourceExist(resourceName string, resourceType string, exist bool, err error) bool {
|
||||
|
||||
@@ -23,12 +23,12 @@ import (
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
func GetApiServerUrl() string {
|
||||
return fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort))
|
||||
func GetApiServerUrl(port uint16) string {
|
||||
return fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(port))
|
||||
}
|
||||
|
||||
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc) {
|
||||
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel)
|
||||
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc, port uint16) {
|
||||
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, port, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel)
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
|
||||
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
|
||||
@@ -36,7 +36,7 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
|
||||
return
|
||||
}
|
||||
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := apiProvider.TestConnection(); err != nil {
|
||||
logger.Log.Debugf("Couldn't connect using proxy, stopping proxy and trying to create port-forward")
|
||||
if err := httpServer.Shutdown(ctx); err != nil {
|
||||
@@ -44,14 +44,14 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
|
||||
}
|
||||
|
||||
podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName)
|
||||
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, config.Config.Tap.GuiPort, ctx, cancel); err != nil {
|
||||
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, port, ctx, cancel); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running port forward %v\n"+
|
||||
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := apiProvider.TestConnection(); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
||||
cancel()
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/telemetry"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
var installCmd = &cobra.Command{
|
||||
@@ -12,13 +11,13 @@ var installCmd = &cobra.Command{
|
||||
Short: "Installs mizu components",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
go telemetry.ReportRun("install", nil)
|
||||
runMizuInstall()
|
||||
return nil
|
||||
},
|
||||
PreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
if config.Config.IsNsRestrictedMode() {
|
||||
return fmt.Errorf("install is not supported in restricted namespace mode")
|
||||
}
|
||||
logger.Log.Infof("This command has been deprecated, please use helm as described below.\n\n")
|
||||
|
||||
logger.Log.Infof("To install stable build of Mizu on your cluster using helm, run the following command:")
|
||||
logger.Log.Infof(" helm install mizu https://static.up9.com/mizu/helm --namespace=mizu --create-namespace\n\n")
|
||||
|
||||
logger.Log.Infof("To install development build of Mizu on your cluster using helm, run the following command:")
|
||||
logger.Log.Infof(" helm install mizu https://static.up9.com/mizu/helm-develop --namespace=mizu --create-namespace")
|
||||
|
||||
return nil
|
||||
},
|
||||
@@ -27,4 +26,3 @@ var installCmd = &cobra.Command{
|
||||
func init() {
|
||||
rootCmd.AddCommand(installCmd)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,81 +0,0 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/creasty/defaults"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/errormessage"
|
||||
"github.com/up9inc/mizu/cli/resources"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func runMizuInstall() {
|
||||
kubernetesProvider, err := getKubernetesProviderForCli()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel() // cancel will be called when this function exits
|
||||
|
||||
var serializedValidationRules string
|
||||
var serializedContract string
|
||||
|
||||
var defaultMaxEntriesDBSizeBytes int64 = 200 * 1000 * 1000
|
||||
|
||||
defaultResources := shared.Resources{}
|
||||
if err := defaults.Set(&defaultResources); err != nil {
|
||||
logger.Log.Debug(err)
|
||||
}
|
||||
|
||||
mizuAgentConfig := getInstallMizuAgentConfig(defaultMaxEntriesDBSizeBytes, defaultResources)
|
||||
serializedMizuConfig, err := getSerializedMizuAgentConfig(mizuAgentConfig)
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error serializing mizu config: %v", errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
|
||||
if err = resources.CreateInstallMizuResources(ctx, kubernetesProvider, serializedValidationRules,
|
||||
serializedContract, serializedMizuConfig, config.Config.IsNsRestrictedMode(),
|
||||
config.Config.MizuResourcesNamespace, config.Config.AgentImage,
|
||||
config.Config.KratosImage, config.Config.KetoImage,
|
||||
nil, defaultMaxEntriesDBSizeBytes, defaultResources, config.Config.ImagePullPolicy(),
|
||||
config.Config.LogLevel(), false); err != nil {
|
||||
var statusError *k8serrors.StatusError
|
||||
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
|
||||
logger.Log.Info("Mizu is already running in this namespace, run `mizu clean` to remove the currently running Mizu instance")
|
||||
} else {
|
||||
defer resources.CleanUpMizuResources(ctx, cancel, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.MizuResourcesNamespace)
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
logger.Log.Infof(uiUtils.Magenta, "Installation completed, run `mizu view` to connect to the mizu daemon instance")
|
||||
}
|
||||
|
||||
func getInstallMizuAgentConfig(maxDBSizeBytes int64, tapperResources shared.Resources) *shared.MizuAgentConfig {
|
||||
mizuAgentConfig := shared.MizuAgentConfig{
|
||||
MaxDBSizeBytes: maxDBSizeBytes,
|
||||
AgentImage: config.Config.AgentImage,
|
||||
PullPolicy: config.Config.ImagePullPolicyStr,
|
||||
LogLevel: config.Config.LogLevel(),
|
||||
TapperResources: tapperResources,
|
||||
MizuResourcesNamespace: config.Config.MizuResourcesNamespace,
|
||||
AgentDatabasePath: shared.DataDirPath,
|
||||
StandaloneMode: true,
|
||||
ServiceMap: config.Config.ServiceMap,
|
||||
OAS: config.Config.OAS,
|
||||
Elastic: config.Config.Elastic,
|
||||
}
|
||||
|
||||
return &mizuAgentConfig
|
||||
}
|
||||
@@ -45,7 +45,7 @@ var apiProvider *apiserver.Provider
|
||||
func RunMizuTap() {
|
||||
state.startTime = time.Now()
|
||||
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(config.Config.Tap.GuiPort), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
|
||||
var err error
|
||||
var serializedValidationRules string
|
||||
@@ -162,6 +162,7 @@ func getTapMizuAgentConfig() *shared.MizuAgentConfig {
|
||||
AgentDatabasePath: shared.DataDirPath,
|
||||
ServiceMap: config.Config.ServiceMap,
|
||||
OAS: config.Config.OAS,
|
||||
Telemetry: config.Config.Telemetry,
|
||||
Elastic: config.Config.Elastic,
|
||||
}
|
||||
|
||||
@@ -421,7 +422,7 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr
|
||||
}
|
||||
|
||||
func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel)
|
||||
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, config.Config.Tap.GuiPort)
|
||||
|
||||
options, _ := getMizuApiFilteringOptions()
|
||||
if err := startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, *options, state.startTime); err != nil {
|
||||
@@ -429,7 +430,7 @@ func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Pr
|
||||
cancel()
|
||||
}
|
||||
|
||||
url := GetApiServerUrl()
|
||||
url := GetApiServerUrl(config.Config.Tap.GuiPort)
|
||||
logger.Log.Infof("Mizu is available at %s", url)
|
||||
if !config.Config.HeadlessMode {
|
||||
uiUtils.OpenBrowser(url)
|
||||
|
||||
@@ -3,13 +3,13 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/utils"
|
||||
"net/http"
|
||||
|
||||
"github.com/up9inc/mizu/cli/utils"
|
||||
|
||||
"github.com/up9inc/mizu/cli/apiserver"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/mizu/fsUtils"
|
||||
"github.com/up9inc/mizu/cli/mizu/version"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
"github.com/up9inc/mizu/shared/kubernetes"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
@@ -39,7 +39,7 @@ func runMizuView() {
|
||||
return
|
||||
}
|
||||
|
||||
url = GetApiServerUrl()
|
||||
url = GetApiServerUrl(config.Config.View.GuiPort)
|
||||
|
||||
response, err := http.Get(fmt.Sprintf("%s/", url))
|
||||
if err == nil && response.StatusCode == 200 {
|
||||
@@ -47,7 +47,7 @@ func runMizuView() {
|
||||
return
|
||||
}
|
||||
logger.Log.Infof("Establishing connection to k8s cluster...")
|
||||
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel)
|
||||
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, config.Config.View.GuiPort)
|
||||
}
|
||||
|
||||
apiServerProvider := apiserver.NewProvider(url, apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
@@ -62,14 +62,5 @@ func runMizuView() {
|
||||
uiUtils.OpenBrowser(url)
|
||||
}
|
||||
|
||||
if isCompatible, err := version.CheckVersionCompatibility(apiServerProvider); err != nil {
|
||||
logger.Log.Errorf("Failed to check versions compatibility %v", err)
|
||||
cancel()
|
||||
return
|
||||
} else if !isCompatible {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
utils.WaitForFinish(ctx, cancel)
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ type ConfigStruct struct {
|
||||
ConfigFilePath string `yaml:"config-path,omitempty" readonly:""`
|
||||
HeadlessMode bool `yaml:"headless" default:"false"`
|
||||
LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""`
|
||||
ServiceMap bool `yaml:"service-map" default:"false"`
|
||||
ServiceMap bool `yaml:"service-map" default:"true"`
|
||||
OAS bool `yaml:"oas,omitempty" default:"false" readonly:""`
|
||||
Elastic shared.ElasticConfig `yaml:"elastic"`
|
||||
}
|
||||
|
||||
@@ -1,72 +1 @@
|
||||
# CHANGELOG
|
||||
This document summarizes main and fixes changes published in stable (aka `main`) branch of this project.
|
||||
Ongoing work and development releases are under `develop` branch.
|
||||
|
||||
## 0.24.0
|
||||
|
||||
### main features
|
||||
* ARM64 support -- Mizu is now available for ARM 64bit architecture
|
||||
* Now you can run Mizu with `minikube` on your Apple M1 laptop or any other ARM-based hosts
|
||||
* New command helps user verify Mizu deployment
|
||||
* Run `mizu check` to verify Mizu was deployed successfully
|
||||
* `mizu check` verifies version compatibility, resources and permissions required by Mizu
|
||||
* EXPERIMENTAL: Service Map - graph of all service interactions
|
||||
* Arrow direction show client to server connection
|
||||
* Graph edge width reflects volume of traffic captured between the services
|
||||
* to enable this experimental feature use `--set service-map=true` flag
|
||||
|
||||
### improvements
|
||||
* Mizu container images are now served from [Docker Hub](https://hub.docker.com/r/up9inc/mizu), as multi-architecture images (arm64, amd64)
|
||||
* in Mizu GUI the filter query can now be applied by pressing CONTROL/COMMAND + ENTER
|
||||
* try port-forwarding if http-proxy connection to Mizu API server is not available
|
||||
|
||||
### notable bug fixes
|
||||
* Fixed HTTP/1.0 presentation which was shown as HTTP/1.1
|
||||
* Fixed handling of long-living TCP connections, improves capturing gRPC and HTTP/2 traffic, and helps in service-mesh setups (istio, linkerd)
|
||||
|
||||
|
||||
## 0.23.0
|
||||
### notable bug fixes
|
||||
* fixed errors in Redis protocol parser (better handling of Array and Bulk String message types)
|
||||
|
||||
|
||||
|
||||
## 0.22.0
|
||||
|
||||
### main features
|
||||
* Service Mesh support -- mizu is now capable to tap mTLS traffic between pods connected by Istio service mesh
|
||||
* Use `--service-mesh` option to enable this feature
|
||||
* New installation option - have the same Mizu functionality as long living pods in your cluster, with password protection
|
||||
* To install use `mizu install` command
|
||||
* To access use `mizu view` or `kubectl -n mizu port-forward svc/mizu-api-server`
|
||||
* To uninstall run `mizu clean`
|
||||
* At first login
|
||||
* Set admin password as prompted, use it to login to mizu later on.
|
||||
* After login, user should select cluster namespaces to tap: by default all namespaces in the cluster are selected, user can select/unselect according to their needs. These settings are retained and can be modified at any time via Settings menu (cog icon on the top-right)
|
||||
|
||||
|
||||
### improvements
|
||||
* improved Mizu permissions/roles logic to support clusters with strict PodSecurityPolicy (PSP) -- see [PERMISSIONS](PERMISSIONS.md) doc for more details
|
||||
|
||||
### notable bug fixes
|
||||
* mizu now works properly when API service is exposed via HTTPS url
|
||||
* mizu now properly displays KAFKA message body
|
||||
|
||||
|
||||
|
||||
|
||||
## 0.21.0
|
||||
|
||||
### main features
|
||||
* New traffic search & stream exprience
|
||||
* Rich query language with full-text search capabilities on headers & body
|
||||
* Distinct live-streaming vs paging/browsing modes, all with filter applied
|
||||
|
||||
### improvements
|
||||
* GUI - source and destination IP addresses & service names for each traffic item
|
||||
* GUI - Mizu health - display warning sign in top bar when not all requested pods are successfully tapped
|
||||
* GUI - pod tapping status reflected in the list (ok or problem)
|
||||
* Mizu telemetry - report platform type
|
||||
|
||||
### fixes
|
||||
* Request duration and body size properly shown in GUI (instead of -1)
|
||||
Mizu CHANGELOG is now part of [Mizu wiki](https://github.com/up9inc/mizu/wiki/CHANGELOG)
|
||||
|
||||
@@ -76,6 +76,8 @@ func NewProvider(kubeConfigPath string) (*Provider, error) {
|
||||
"you can set alternative kube config file path by adding the kube-config-path field to the mizu config file, err: %w", kubeConfigPath, err)
|
||||
}
|
||||
|
||||
logger.Log.Debugf("K8s client config, host: %s, api path: %s, user agent: %s", restClientConfig.Host, restClientConfig.APIPath, restClientConfig.UserAgent)
|
||||
|
||||
return &Provider{
|
||||
clientSet: clientSet,
|
||||
kubernetesConfig: kubernetesConfig,
|
||||
@@ -952,6 +954,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
labelSelector := applyconfmeta.LabelSelector()
|
||||
labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName})
|
||||
|
||||
applyOptions := metav1.ApplyOptions{
|
||||
Force: true,
|
||||
FieldManager: fieldManagerName,
|
||||
}
|
||||
|
||||
daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace)
|
||||
daemonSet.
|
||||
WithLabels(map[string]string{
|
||||
@@ -960,7 +967,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
}).
|
||||
WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate))
|
||||
|
||||
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, metav1.ApplyOptions{FieldManager: fieldManagerName})
|
||||
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1000,7 +1007,7 @@ func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, r
|
||||
|
||||
matchingPods := make([]core.Pod, 0)
|
||||
for _, pod := range pods {
|
||||
if isPodRunning(&pod) {
|
||||
if IsPodRunning(&pod) {
|
||||
matchingPods = append(matchingPods, pod)
|
||||
}
|
||||
}
|
||||
@@ -1190,6 +1197,6 @@ func loadKubernetesConfiguration(kubeConfigPath string) clientcmd.ClientConfig {
|
||||
)
|
||||
}
|
||||
|
||||
func isPodRunning(pod *core.Pod) bool {
|
||||
func IsPodRunning(pod *core.Pod) bool {
|
||||
return pod.Status.Phase == core.PodRunning
|
||||
}
|
||||
|
||||
@@ -128,9 +128,14 @@ func getHttpDialer(kubernetesProvider *Provider, namespace string, podName strin
|
||||
return nil, err
|
||||
}
|
||||
|
||||
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName)
|
||||
hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/") // no need specify "t" twice
|
||||
serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}
|
||||
clientConfigHostUrl, err := url.Parse(kubernetesProvider.clientConfig.Host)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed parsing client config host URL %s, error %w", kubernetesProvider.clientConfig.Host, err)
|
||||
}
|
||||
path := fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/portforward", clientConfigHostUrl.Path, namespace, podName)
|
||||
|
||||
serverURL := url.URL{Scheme: "https", Path: path, Host: clientConfigHostUrl.Host}
|
||||
logger.Log.Debugf("Http dialer url %v", serverURL)
|
||||
|
||||
return spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL), nil
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ type MizuAgentConfig struct {
|
||||
StandaloneMode bool `json:"standaloneMode"`
|
||||
ServiceMap bool `json:"serviceMap"`
|
||||
OAS bool `json:"oas"`
|
||||
Telemetry bool `json:"telemetry"`
|
||||
Elastic ElasticConfig `json:"elastic"`
|
||||
}
|
||||
|
||||
|
||||
@@ -39,10 +39,9 @@ type TCP struct {
|
||||
}
|
||||
|
||||
type Extension struct {
|
||||
Protocol *Protocol
|
||||
Path string
|
||||
Dissector Dissector
|
||||
MatcherMap *sync.Map
|
||||
Protocol *Protocol
|
||||
Path string
|
||||
Dissector Dissector
|
||||
}
|
||||
|
||||
type ConnectionInfo struct {
|
||||
@@ -62,7 +61,6 @@ type TcpID struct {
|
||||
}
|
||||
|
||||
type CounterPair struct {
|
||||
StreamId int64
|
||||
Request uint
|
||||
Response uint
|
||||
sync.Mutex
|
||||
@@ -100,10 +98,15 @@ type SuperIdentifier struct {
|
||||
type Dissector interface {
|
||||
Register(*Extension)
|
||||
Ping()
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error
|
||||
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *Entry
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
|
||||
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
|
||||
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
|
||||
Macros() map[string]string
|
||||
NewResponseRequestMatcher() RequestResponseMatcher
|
||||
}
|
||||
|
||||
type RequestResponseMatcher interface {
|
||||
GetMap() *sync.Map
|
||||
}
|
||||
|
||||
type Emitting struct {
|
||||
@@ -125,6 +128,7 @@ type Entry struct {
|
||||
Protocol Protocol `json:"proto"`
|
||||
Source *TCP `json:"src"`
|
||||
Destination *TCP `json:"dst"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
Outgoing bool `json:"outgoing"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
StartTime time.Time `json:"startTime"`
|
||||
|
||||
@@ -22,6 +22,7 @@ type Cleaner struct {
|
||||
connectionTimeout time.Duration
|
||||
stats CleanerStats
|
||||
statsMutex sync.Mutex
|
||||
streamsMap *tcpStreamMap
|
||||
}
|
||||
|
||||
func (cl *Cleaner) clean() {
|
||||
@@ -32,10 +33,15 @@ func (cl *Cleaner) clean() {
|
||||
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
|
||||
cl.assemblerMutex.Unlock()
|
||||
|
||||
for _, extension := range extensions {
|
||||
deleted := deleteOlderThan(extension.MatcherMap, startCleanTime.Add(-cl.connectionTimeout))
|
||||
cl.streamsMap.streams.Range(func(k, v interface{}) bool {
|
||||
reqResMatcher := v.(*tcpStreamWrapper).reqResMatcher
|
||||
if reqResMatcher == nil {
|
||||
return true
|
||||
}
|
||||
deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout))
|
||||
cl.stats.deleted += deleted
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
cl.statsMutex.Lock()
|
||||
logger.Log.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())
|
||||
|
||||
@@ -42,7 +42,7 @@ func (d dissecting) Ping() {
|
||||
|
||||
const amqpRequest string = "amqp_request"
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
|
||||
r := AmqpReader{b}
|
||||
|
||||
var remaining int
|
||||
@@ -212,7 +212,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
|
||||
@@ -254,6 +254,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
IP: item.ConnectionInfo.ServerIP,
|
||||
Port: item.ConnectionInfo.ServerPort,
|
||||
},
|
||||
Namespace: namespace,
|
||||
Outgoing: item.ConnectionInfo.IsOutgoing,
|
||||
Request: reqDetails,
|
||||
Method: request["method"].(string),
|
||||
@@ -300,6 +301,10 @@ func (d dissecting) Macros() map[string]string {
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
|
||||
return nil
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
||||
|
||||
func NewDissector() api.Dissector {
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/http/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect2/http/\* expect
|
||||
|
||||
@@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
|
||||
item.ConnectionInfo.ClientPort = ""
|
||||
}
|
||||
|
||||
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
|
||||
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -58,7 +58,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
|
||||
switch messageHTTP1 := messageHTTP1.(type) {
|
||||
case http.Request:
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d %s",
|
||||
"%s_%s_%s_%s_%d_%s",
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcPort,
|
||||
@@ -78,7 +78,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
|
||||
}
|
||||
case http.Response:
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d %s",
|
||||
"%s_%s_%s_%s_%d_%s",
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstPort,
|
||||
@@ -110,7 +110,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
|
||||
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
|
||||
req, err = http.ReadRequest(b)
|
||||
if err != nil {
|
||||
return
|
||||
@@ -130,8 +130,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%d_%s:%s_%s:%s_%d_%s",
|
||||
counterPair.StreamId,
|
||||
"%s_%s_%s_%s_%d_%s",
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcPort,
|
||||
@@ -153,7 +152,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
return
|
||||
}
|
||||
|
||||
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, err error) {
|
||||
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
|
||||
var res *http.Response
|
||||
res, err = http.ReadResponse(b, nil)
|
||||
if err != nil {
|
||||
@@ -174,8 +173,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%d_%s:%s_%s:%s_%d_%s",
|
||||
counterPair.StreamId,
|
||||
"%s_%s_%s_%s_%d_%s",
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstPort,
|
||||
|
||||
@@ -84,14 +84,15 @@ type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = &http11protocol
|
||||
extension.MatcherMap = reqResMatcher.openMessagesMap
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s", http11protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
|
||||
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
|
||||
|
||||
var err error
|
||||
isHTTP2, _ := checkIsHTTP2Connection(b, isClient)
|
||||
|
||||
@@ -124,7 +125,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
}
|
||||
|
||||
if isHTTP2 {
|
||||
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options)
|
||||
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options, reqResMatcher)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
@@ -133,7 +134,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
superIdentifier.Protocol = &http11protocol
|
||||
} else if isClient {
|
||||
var req *http.Request
|
||||
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
|
||||
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
@@ -144,7 +145,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
|
||||
if switchingProtocolsHTTP2 {
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s 1 %s",
|
||||
"%s_%s_%s_%s_1_%s",
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcPort,
|
||||
@@ -164,7 +165,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
}
|
||||
}
|
||||
} else {
|
||||
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
|
||||
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
@@ -181,7 +182,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
|
||||
var host, authority, path string
|
||||
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
@@ -279,6 +280,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
IP: item.ConnectionInfo.ServerIP,
|
||||
Port: item.ConnectionInfo.ServerPort,
|
||||
},
|
||||
Namespace: namespace,
|
||||
Outgoing: item.ConnectionInfo.IsOutgoing,
|
||||
Request: reqDetails,
|
||||
Response: resDetails,
|
||||
@@ -472,6 +474,10 @@ func (d dissecting) Macros() map[string]string {
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
|
||||
return createResponseRequestMatcher()
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
||||
|
||||
func NewDissector() api.Dissector {
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -39,7 +38,6 @@ func TestRegister(t *testing.T) {
|
||||
extension := &api.Extension{}
|
||||
dissector.Register(extension)
|
||||
assert.Equal(t, "http", extension.Protocol.Name)
|
||||
assert.NotNil(t, extension.MatcherMap)
|
||||
}
|
||||
|
||||
func TestMacros(t *testing.T) {
|
||||
@@ -123,7 +121,8 @@ func TestDissect(t *testing.T) {
|
||||
SrcPort: "1",
|
||||
DstPort: "2",
|
||||
}
|
||||
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options)
|
||||
reqResMatcher := dissector.NewResponseRequestMatcher()
|
||||
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
|
||||
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
||||
panic(err)
|
||||
}
|
||||
@@ -141,7 +140,7 @@ func TestDissect(t *testing.T) {
|
||||
SrcPort: "2",
|
||||
DstPort: "1",
|
||||
}
|
||||
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options)
|
||||
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
|
||||
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
||||
panic(err)
|
||||
}
|
||||
@@ -155,14 +154,6 @@ func TestDissect(t *testing.T) {
|
||||
|
||||
stop <- true
|
||||
|
||||
sort.Slice(items, func(i, j int) bool {
|
||||
iMarshaled, err := json.Marshal(items[i])
|
||||
assert.Nil(t, err)
|
||||
jMarshaled, err := json.Marshal(items[j])
|
||||
assert.Nil(t, err)
|
||||
return len(iMarshaled) < len(jMarshaled)
|
||||
})
|
||||
|
||||
marshaled, err := json.Marshal(items)
|
||||
assert.Nil(t, err)
|
||||
|
||||
@@ -214,7 +205,7 @@ func TestAnalyze(t *testing.T) {
|
||||
|
||||
var entries []*api.Entry
|
||||
for _, item := range items {
|
||||
entry := dissector.Analyze(item, "", "")
|
||||
entry := dissector.Analyze(item, "", "", "")
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
|
||||
|
||||
@@ -8,16 +8,17 @@ import (
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var reqResMatcher = createResponseRequestMatcher() // global
|
||||
|
||||
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
|
||||
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{incremental_counter}_{proto_ident}
|
||||
type requestResponseMatcher struct {
|
||||
openMessagesMap *sync.Map
|
||||
}
|
||||
|
||||
func createResponseRequestMatcher() requestResponseMatcher {
|
||||
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
|
||||
return *newMatcher
|
||||
func createResponseRequestMatcher() api.RequestResponseMatcher {
|
||||
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
|
||||
return matcher.openMessagesMap
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
|
||||
|
||||
@@ -33,27 +33,27 @@ type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = &_protocol
|
||||
extension.MatcherMap = reqResMatcher.openMessagesMap
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s", _protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
|
||||
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
|
||||
for {
|
||||
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
|
||||
return errors.New("Identified by another protocol")
|
||||
}
|
||||
|
||||
if isClient {
|
||||
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer)
|
||||
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer, reqResMatcher)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
superIdentifier.Protocol = &_protocol
|
||||
} else {
|
||||
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter)
|
||||
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter, reqResMatcher)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -62,7 +62,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
apiKey := ApiKey(reqDetails["apiKey"].(float64))
|
||||
@@ -158,6 +158,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
IP: item.ConnectionInfo.ServerIP,
|
||||
Port: item.ConnectionInfo.ServerPort,
|
||||
},
|
||||
Namespace: namespace,
|
||||
Outgoing: item.ConnectionInfo.IsOutgoing,
|
||||
Request: reqDetails,
|
||||
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
|
||||
@@ -215,6 +216,10 @@ func (d dissecting) Macros() map[string]string {
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
|
||||
return createResponseRequestMatcher()
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
||||
|
||||
func NewDissector() api.Dissector {
|
||||
|
||||
@@ -3,9 +3,10 @@ package kafka
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var reqResMatcher = CreateResponseRequestMatcher() // global
|
||||
const maxTry int = 3000
|
||||
|
||||
type RequestResponsePair struct {
|
||||
@@ -13,14 +14,17 @@ type RequestResponsePair struct {
|
||||
Response Response
|
||||
}
|
||||
|
||||
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}::{correlation_id}
|
||||
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{correlation_id}
|
||||
type requestResponseMatcher struct {
|
||||
openMessagesMap *sync.Map
|
||||
}
|
||||
|
||||
func CreateResponseRequestMatcher() requestResponseMatcher {
|
||||
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
|
||||
return *newMatcher
|
||||
func createResponseRequestMatcher() api.RequestResponseMatcher {
|
||||
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
|
||||
return matcher.openMessagesMap
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair {
|
||||
|
||||
@@ -19,7 +19,7 @@ type Request struct {
|
||||
CaptureTime time.Time `json:"captureTime"`
|
||||
}
|
||||
|
||||
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
|
||||
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) {
|
||||
d := &decoder{reader: r, remain: 4}
|
||||
size := d.readInt32()
|
||||
|
||||
@@ -214,8 +214,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
|
||||
}
|
||||
|
||||
key := fmt.Sprintf(
|
||||
"%d_%s:%s_%s:%s_%d",
|
||||
counterPair.StreamId,
|
||||
"%s_%s_%s_%s_%d",
|
||||
tcpID.SrcIP,
|
||||
tcpID.SrcPort,
|
||||
tcpID.DstIP,
|
||||
|
||||
@@ -16,7 +16,7 @@ type Response struct {
|
||||
CaptureTime time.Time `json:"captureTime"`
|
||||
}
|
||||
|
||||
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
|
||||
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
|
||||
d := &decoder{reader: r, remain: 4}
|
||||
size := d.readInt32()
|
||||
|
||||
@@ -44,8 +44,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s
|
||||
}
|
||||
|
||||
key := fmt.Sprintf(
|
||||
"%d_%s:%s_%s:%s_%d",
|
||||
counterPair.StreamId,
|
||||
"%s_%s_%s_%s_%d",
|
||||
tcpID.DstIP,
|
||||
tcpID.DstPort,
|
||||
tcpID.SrcIP,
|
||||
|
||||
@@ -6,15 +6,14 @@ import (
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
|
||||
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
|
||||
counterPair.Lock()
|
||||
counterPair.Request++
|
||||
requestCounter := counterPair.Request
|
||||
counterPair.Unlock()
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%d_%s:%s_%s:%s_%d",
|
||||
counterPair.StreamId,
|
||||
"%s_%s_%s_%s_%d",
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcPort,
|
||||
@@ -36,15 +35,14 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
|
||||
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
|
||||
counterPair.Lock()
|
||||
counterPair.Response++
|
||||
responseCounter := counterPair.Response
|
||||
counterPair.Unlock()
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%d_%s:%s_%s:%s_%d",
|
||||
counterPair.StreamId,
|
||||
"%s_%s_%s_%s_%d",
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstPort,
|
||||
|
||||
@@ -32,14 +32,14 @@ type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = &protocol
|
||||
extension.MatcherMap = reqResMatcher.openMessagesMap
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s", protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
|
||||
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
|
||||
is := &RedisInputStream{
|
||||
Reader: b,
|
||||
Buf: make([]byte, 8192),
|
||||
@@ -52,9 +52,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
}
|
||||
|
||||
if isClient {
|
||||
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket)
|
||||
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
|
||||
} else {
|
||||
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket)
|
||||
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@@ -63,7 +63,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
response := item.Pair.Response.Payload.(map[string]interface{})
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
@@ -96,6 +96,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
IP: item.ConnectionInfo.ServerIP,
|
||||
Port: item.ConnectionInfo.ServerPort,
|
||||
},
|
||||
Namespace: namespace,
|
||||
Outgoing: item.ConnectionInfo.IsOutgoing,
|
||||
Request: reqDetails,
|
||||
Response: resDetails,
|
||||
@@ -127,6 +128,10 @@ func (d dissecting) Macros() map[string]string {
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
|
||||
return createResponseRequestMatcher()
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
||||
|
||||
func NewDissector() api.Dissector {
|
||||
|
||||
@@ -7,16 +7,17 @@ import (
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var reqResMatcher = createResponseRequestMatcher() // global
|
||||
|
||||
// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}`
|
||||
// Key is `{src_ip}_{dst_ip}_{src_ip}_{src_port}_{incremental_counter}`
|
||||
type requestResponseMatcher struct {
|
||||
openMessagesMap *sync.Map
|
||||
}
|
||||
|
||||
func createResponseRequestMatcher() requestResponseMatcher {
|
||||
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
|
||||
return *newMatcher
|
||||
func createResponseRequestMatcher() api.RequestResponseMatcher {
|
||||
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
|
||||
return matcher.openMessagesMap
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
|
||||
|
||||
@@ -210,6 +210,7 @@ func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem)
|
||||
assemblerMutex: &assembler.assemblerMutex,
|
||||
cleanPeriod: cleanPeriod,
|
||||
connectionTimeout: staleConnectionTimeout,
|
||||
streamsMap: streamsMap,
|
||||
}
|
||||
cleaner.start()
|
||||
|
||||
|
||||
@@ -47,6 +47,7 @@ type tcpReader struct {
|
||||
extension *api.Extension
|
||||
emitter api.Emitter
|
||||
counterPair *api.CounterPair
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
@@ -94,7 +95,7 @@ func (h *tcpReader) Close() {
|
||||
func (h *tcpReader) run(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
b := bufio.NewReader(h)
|
||||
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions)
|
||||
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
|
||||
if err != nil {
|
||||
_, err = io.Copy(ioutil.Discard, b)
|
||||
if err != nil {
|
||||
|
||||
@@ -29,8 +29,9 @@ type tcpStreamFactory struct {
|
||||
}
|
||||
|
||||
type tcpStreamWrapper struct {
|
||||
stream *tcpStream
|
||||
createdAt time.Time
|
||||
stream *tcpStream
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
createdAt time.Time
|
||||
}
|
||||
|
||||
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory {
|
||||
@@ -81,8 +82,8 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
if stream.isTapTarget {
|
||||
stream.id = factory.streamsMap.nextId()
|
||||
for i, extension := range extensions {
|
||||
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
|
||||
counterPair := &api.CounterPair{
|
||||
StreamId: stream.id,
|
||||
Request: 0,
|
||||
Response: 0,
|
||||
}
|
||||
@@ -103,6 +104,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
extension: extension,
|
||||
emitter: factory.Emitter,
|
||||
counterPair: counterPair,
|
||||
reqResMatcher: reqResMatcher,
|
||||
})
|
||||
stream.servers = append(stream.servers, tcpReader{
|
||||
msgQueue: make(chan tcpReaderDataMsg),
|
||||
@@ -121,11 +123,13 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
extension: extension,
|
||||
emitter: factory.Emitter,
|
||||
counterPair: counterPair,
|
||||
reqResMatcher: reqResMatcher,
|
||||
})
|
||||
|
||||
factory.streamsMap.Store(stream.id, &tcpStreamWrapper{
|
||||
stream: stream,
|
||||
createdAt: time.Now(),
|
||||
stream: stream,
|
||||
reqResMatcher: reqResMatcher,
|
||||
createdAt: time.Now(),
|
||||
})
|
||||
|
||||
factory.wg.Add(2)
|
||||
|
||||
@@ -22,6 +22,8 @@ import OasModal from "../../OasModal/OasModal";
|
||||
import {useCommonStyles} from "../../../helpers/commonStyle"
|
||||
import {TLSWarning} from "../../TLSWarning/TLSWarning";
|
||||
import serviceMapModalOpenAtom from "../../../recoil/serviceMapModalOpen";
|
||||
import serviceMap from "../../assets/serviceMap.svg";
|
||||
import services from "../../assets/services.svg";
|
||||
|
||||
const useLayoutStyles = makeStyles(() => ({
|
||||
details: {
|
||||
@@ -74,7 +76,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
|
||||
const scrollableRef = useRef(null);
|
||||
|
||||
const [openOasModal, setOpenOasModal] = useState(false);
|
||||
const handleOpenModal = () => setOpenOasModal(true);
|
||||
|
||||
const handleCloseModal = () => setOpenOasModal(false);
|
||||
|
||||
const [showTLSWarning, setShowTLSWarning] = useState(false);
|
||||
@@ -256,8 +258,14 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
|
||||
}
|
||||
}
|
||||
|
||||
const handleOpenOasModal = () => {
|
||||
ws.current.close();
|
||||
setOpenOasModal(true);
|
||||
}
|
||||
|
||||
const openServiceMapModalDebounce = debounce(() => {
|
||||
setServiceMapModalOpen(true)
|
||||
ws.current.close();
|
||||
setServiceMapModalOpen(true);
|
||||
}, 500);
|
||||
|
||||
return (
|
||||
@@ -277,17 +285,21 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
|
||||
</div>
|
||||
<div style={{ display: 'flex' }}>
|
||||
{window["isOasEnabled"] && <Button
|
||||
startIcon={<img className="custom" src={services} alt="services"></img>}
|
||||
size="large"
|
||||
type="submit"
|
||||
variant="contained"
|
||||
className={commonClasses.button}
|
||||
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
|
||||
style={{ marginRight: 25 }}
|
||||
onClick={handleOpenModal}
|
||||
onClick={handleOpenOasModal}
|
||||
>
|
||||
Show OAS
|
||||
</Button>}
|
||||
{window["isServiceMapEnabled"] && <Button
|
||||
startIcon={<img src={serviceMap} className="custom" alt="service-map" style={{marginRight:"8%"}}></img>}
|
||||
size="large"
|
||||
variant="contained"
|
||||
className={commonClasses.button}
|
||||
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
|
||||
onClick={openServiceMapModalDebounce}
|
||||
>
|
||||
Service Map
|
||||
|
||||
32
ui/src/components/ServiceMapModal/ServiceMapModal.sass
Normal file
32
ui/src/components/ServiceMapModal/ServiceMapModal.sass
Normal file
@@ -0,0 +1,32 @@
|
||||
@import "../../variables.module"
|
||||
|
||||
|
||||
.legend-scale ul
|
||||
margin-top: -29px
|
||||
margin-left: -27px
|
||||
padding: 0
|
||||
float: left
|
||||
list-style: none
|
||||
|
||||
li
|
||||
display: block
|
||||
float: left
|
||||
width: 50px
|
||||
margin-bottom: 6px
|
||||
text-align: center
|
||||
font-size: 80%
|
||||
list-style: none
|
||||
|
||||
ul.legend-labels li span
|
||||
display: block
|
||||
float: left
|
||||
height: 15px
|
||||
width: 50px
|
||||
|
||||
.legend-source
|
||||
font-size: 70%
|
||||
color: #999
|
||||
clear: both
|
||||
|
||||
a
|
||||
color: #777
|
||||
@@ -3,11 +3,14 @@ import { Box, Fade, Modal, Backdrop, Button } from "@material-ui/core";
|
||||
import { toast } from "react-toastify";
|
||||
import Api from "../../helpers/api";
|
||||
import spinnerStyle from '../style/Spinner.module.sass';
|
||||
import './ServiceMapModal.sass';
|
||||
import spinnerImg from '../assets/spinner.svg';
|
||||
import Graph from "react-graph-vis";
|
||||
import debounce from 'lodash/debounce';
|
||||
import ServiceMapOptions from './ServiceMapOptions'
|
||||
import { useCommonStyles } from "../../helpers/commonStyle";
|
||||
import refresh from "../assets/refresh.svg";
|
||||
import close from "../assets/close.svg";
|
||||
|
||||
interface GraphData {
|
||||
nodes: Node[];
|
||||
@@ -29,6 +32,7 @@ interface Edge {
|
||||
label: string;
|
||||
title?: string;
|
||||
color?: object;
|
||||
font?: object;
|
||||
}
|
||||
|
||||
interface ServiceMapNode {
|
||||
@@ -104,7 +108,7 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
|
||||
const newGraphData: GraphData = { nodes: [], edges: [] }
|
||||
|
||||
if (serviceMapData.nodes) {
|
||||
newGraphData.nodes = serviceMapData.nodes.map(node => {
|
||||
newGraphData.nodes = serviceMapData.nodes.map<Node>(node => {
|
||||
return {
|
||||
id: node.id,
|
||||
value: node.count,
|
||||
@@ -115,7 +119,7 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
|
||||
}
|
||||
|
||||
if (serviceMapData.edges) {
|
||||
newGraphData.edges = serviceMapData.edges.map(edge => {
|
||||
newGraphData.edges = serviceMapData.edges.map<Edge>(edge => {
|
||||
return {
|
||||
from: edge.source.id,
|
||||
to: edge.destination.id,
|
||||
@@ -125,6 +129,10 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
|
||||
color: edge.protocol.backgroundColor,
|
||||
highlight: edge.protocol.backgroundColor
|
||||
},
|
||||
font: {
|
||||
color: edge.protocol.backgroundColor,
|
||||
strokeColor: edge.protocol.backgroundColor
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -141,22 +149,10 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
|
||||
}, [isOpen])
|
||||
|
||||
useEffect(() => {
|
||||
getServiceMapData()
|
||||
getServiceMapData();
|
||||
return () => setGraphData({ nodes: [], edges: [] })
|
||||
}, [getServiceMapData])
|
||||
|
||||
const resetServiceMap = debounce(async () => {
|
||||
try {
|
||||
const serviceMapResetResponse = await api.serviceMapReset();
|
||||
if (serviceMapResetResponse["status"] === "enabled") {
|
||||
refreshServiceMap()
|
||||
}
|
||||
|
||||
} catch (ex) {
|
||||
toast.error("An error occurred while resetting Mizu Service Map, see console for mode details");
|
||||
console.error(ex);
|
||||
}
|
||||
}, 500);
|
||||
|
||||
const refreshServiceMap = debounce(() => {
|
||||
getServiceMapData();
|
||||
}, 500);
|
||||
@@ -180,33 +176,34 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
|
||||
<img alt="spinner" src={spinnerImg} style={{ height: 50 }} />
|
||||
</div>}
|
||||
{!isLoading && <div style={{ height: "100%", width: "100%" }}>
|
||||
<div style={{ display: "flex", justifyContent: "space-between" }}>
|
||||
<div>
|
||||
<Button
|
||||
startIcon={<img src={refresh} className="custom" alt="refresh" style={{ marginRight:"8%"}}></img>}
|
||||
size="medium"
|
||||
variant="contained"
|
||||
className={commonClasses.button}
|
||||
style={{ marginRight: 25 }}
|
||||
onClick={() => onClose()}
|
||||
>
|
||||
Close
|
||||
</Button>
|
||||
<Button
|
||||
variant="contained"
|
||||
className={commonClasses.button}
|
||||
style={{ marginRight: 25 }}
|
||||
onClick={resetServiceMap}
|
||||
>
|
||||
Reset
|
||||
</Button>
|
||||
<Button
|
||||
variant="contained"
|
||||
className={commonClasses.button}
|
||||
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
|
||||
onClick={refreshServiceMap}
|
||||
>
|
||||
Refresh
|
||||
</Button>
|
||||
</div>
|
||||
<img src={close} alt="close" onClick={() => onClose()} style={{cursor:"pointer"}}></img>
|
||||
</div>
|
||||
<Graph
|
||||
graph={graphData}
|
||||
options={ServiceMapOptions}
|
||||
/>
|
||||
<div className='legend-scale'>
|
||||
<ul className='legend-labels'>
|
||||
<li><span style={{ background: '#205cf5' }}></span>HTTP</li>
|
||||
<li><span style={{ background: '#244c5a' }}></span>HTTP/2</li>
|
||||
<li><span style={{ background: '#244c5a' }}></span>gRPC</li>
|
||||
<li><span style={{ background: '#ff6600' }}></span>AMQP</li>
|
||||
<li><span style={{ background: '#000000' }}></span>KAFKA</li>
|
||||
<li><span style={{ background: '#a41e11' }}></span>REDIS</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>}
|
||||
</Box>
|
||||
</Fade>
|
||||
|
||||
@@ -1,3 +1,142 @@
|
||||
|
||||
const minNodeScaling = 10
|
||||
const maxNodeScaling = 30
|
||||
|
||||
const minEdgeScaling = 1
|
||||
const maxEdgeScaling = maxNodeScaling / 2
|
||||
|
||||
const minLabelScaling = 11
|
||||
const maxLabelScaling = 16
|
||||
const selectedNodeColor = "#0C0B1A"
|
||||
const selectedNodeBorderColor = "#205CF5"
|
||||
const selectedNodeLabelColor = "#205CF5"
|
||||
const selectedEdgeLabelColor = "#205CF5"
|
||||
|
||||
const customScaling = (min, max, total, value) => {
|
||||
if (max === min) {
|
||||
return 0.5;
|
||||
}
|
||||
else {
|
||||
const scale = 1 / (max - min);
|
||||
return Math.max(0, (value - min) * scale);
|
||||
}
|
||||
}
|
||||
|
||||
const nodeSelected = (values, id, selected, hovering) => {
|
||||
values.color = selectedNodeColor;
|
||||
values.borderColor = selectedNodeBorderColor;
|
||||
values.borderWidth = 4;
|
||||
}
|
||||
|
||||
const nodeLabelSelected = (values, id, selected, hovering) => {
|
||||
values.size = values.size + 1;
|
||||
values.color = selectedNodeLabelColor;
|
||||
values.strokeColor = selectedNodeLabelColor;
|
||||
values.strokeWidth = 0.2
|
||||
}
|
||||
|
||||
const edgeSelected = (values, id, selected, hovering) => {
|
||||
values.opacity = 0.4;
|
||||
values.width = values.width + 1;
|
||||
}
|
||||
|
||||
const edgeLabelSelected = (values, id, selected, hovering) => {
|
||||
values.size = values.size + 1;
|
||||
values.color = selectedEdgeLabelColor;
|
||||
values.strokeColor = selectedEdgeLabelColor;
|
||||
values.strokeWidth = 0.2
|
||||
}
|
||||
|
||||
const nodeOptions = {
|
||||
shape: 'dot',
|
||||
chosen: {
|
||||
node: nodeSelected,
|
||||
label: nodeLabelSelected,
|
||||
},
|
||||
color: {
|
||||
background: '#494677',
|
||||
border: selectedNodeColor,
|
||||
},
|
||||
font: {
|
||||
color: selectedNodeColor,
|
||||
size: 11, // px
|
||||
face: 'Roboto',
|
||||
background: '#FFFFFFBF',
|
||||
strokeWidth: 0.2, // px
|
||||
strokeColor: selectedNodeColor,
|
||||
align: 'center',
|
||||
multi: false,
|
||||
},
|
||||
// defines the node min and max sizes when zoom in/out, based on the node value
|
||||
scaling: {
|
||||
min: minNodeScaling,
|
||||
max: maxNodeScaling,
|
||||
// defines the label scaling size in px
|
||||
label: {
|
||||
enabled: true,
|
||||
min: minLabelScaling,
|
||||
max: maxLabelScaling,
|
||||
maxVisible: maxLabelScaling,
|
||||
drawThreshold: 5,
|
||||
},
|
||||
customScalingFunction: customScaling,
|
||||
},
|
||||
borderWidth: 2,
|
||||
labelHighlightBold: true,
|
||||
opacity: 1,
|
||||
shadow: true,
|
||||
}
|
||||
|
||||
const edgeOptions = {
|
||||
chosen: {
|
||||
edge: edgeSelected,
|
||||
label: edgeLabelSelected,
|
||||
},
|
||||
dashes: false,
|
||||
arrowStrikethrough: false,
|
||||
arrows: {
|
||||
to: {
|
||||
enabled: true,
|
||||
},
|
||||
middle: {
|
||||
enabled: false,
|
||||
},
|
||||
from: {
|
||||
enabled: false,
|
||||
}
|
||||
},
|
||||
smooth: {
|
||||
enabled: true,
|
||||
type: 'dynamic',
|
||||
roundness: 1.0
|
||||
},
|
||||
font: {
|
||||
color: '#000000',
|
||||
size: 11, // px
|
||||
face: 'Roboto',
|
||||
background: '#FFFFFFCC',
|
||||
strokeWidth: 0.2, // px
|
||||
strokeColor: '#000000',
|
||||
align: 'horizontal',
|
||||
multi: false,
|
||||
},
|
||||
scaling: {
|
||||
min: minEdgeScaling,
|
||||
max: maxEdgeScaling,
|
||||
label: {
|
||||
enabled: true,
|
||||
min: minLabelScaling,
|
||||
max: maxLabelScaling,
|
||||
maxVisible: maxLabelScaling,
|
||||
drawThreshold: 5
|
||||
},
|
||||
customScalingFunction: customScaling,
|
||||
},
|
||||
labelHighlightBold: true,
|
||||
selectionWidth: 1,
|
||||
shadow: true,
|
||||
}
|
||||
|
||||
const ServiceMapOptions = {
|
||||
physics: {
|
||||
enabled: true,
|
||||
@@ -5,10 +144,10 @@ const ServiceMapOptions = {
|
||||
barnesHut: {
|
||||
theta: 0.5,
|
||||
gravitationalConstant: -2000,
|
||||
centralGravity: 0.3,
|
||||
centralGravity: 0.4,
|
||||
springLength: 180,
|
||||
springConstant: 0.04,
|
||||
damping: 0.09,
|
||||
damping: 0.2,
|
||||
avoidOverlap: 1
|
||||
},
|
||||
},
|
||||
@@ -16,68 +155,20 @@ const ServiceMapOptions = {
|
||||
hierarchical: false,
|
||||
randomSeed: 1 // always on node 1
|
||||
},
|
||||
nodes: {
|
||||
shape: 'dot',
|
||||
chosen: true,
|
||||
color: {
|
||||
background: '#27AE60',
|
||||
border: '#000000',
|
||||
highlight: {
|
||||
background: '#27AE60',
|
||||
border: '#000000',
|
||||
},
|
||||
},
|
||||
font: {
|
||||
color: '#343434',
|
||||
size: 14, // px
|
||||
face: 'arial',
|
||||
background: 'none',
|
||||
strokeWidth: 0, // px
|
||||
strokeColor: '#ffffff',
|
||||
align: 'center',
|
||||
multi: false,
|
||||
},
|
||||
borderWidth: 1.5,
|
||||
borderWidthSelected: 2.5,
|
||||
labelHighlightBold: true,
|
||||
opacity: 1,
|
||||
shadow: true,
|
||||
},
|
||||
edges: {
|
||||
chosen: true,
|
||||
dashes: false,
|
||||
arrowStrikethrough: false,
|
||||
arrows: {
|
||||
to: {
|
||||
enabled: true,
|
||||
},
|
||||
middle: {
|
||||
enabled: false,
|
||||
},
|
||||
from: {
|
||||
enabled: false,
|
||||
}
|
||||
},
|
||||
smooth: {
|
||||
enabled: true,
|
||||
type: 'dynamic',
|
||||
roundness: 1.0
|
||||
},
|
||||
font: {
|
||||
color: '#343434',
|
||||
size: 12, // px
|
||||
face: 'arial',
|
||||
background: 'none',
|
||||
strokeWidth: 2, // px
|
||||
strokeColor: '#ffffff',
|
||||
align: 'horizontal',
|
||||
multi: false,
|
||||
},
|
||||
labelHighlightBold: true,
|
||||
selectionWidth: 1,
|
||||
shadow: true,
|
||||
},
|
||||
nodes: nodeOptions,
|
||||
edges: edgeOptions,
|
||||
autoResize: true,
|
||||
interaction: {
|
||||
selectable: true,
|
||||
selectConnectedEdges: true,
|
||||
multiselect: true,
|
||||
dragNodes: true,
|
||||
dragView: true,
|
||||
hover: true,
|
||||
hoverConnectedEdges: true,
|
||||
zoomView: true,
|
||||
zoomSpeed: 1,
|
||||
},
|
||||
};
|
||||
|
||||
export default ServiceMapOptions
|
||||
4
ui/src/components/assets/close.svg
Normal file
4
ui/src/components/assets/close.svg
Normal file
@@ -0,0 +1,4 @@
|
||||
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path d="M18.591 9.99997C18.591 14.7446 14.7447 18.5909 10.0001 18.5909C5.25546 18.5909 1.40918 14.7446 1.40918 9.99997C1.40918 5.25534 5.25546 1.40906 10.0001 1.40906C14.7447 1.40906 18.591 5.25534 18.591 9.99997Z" fill="#E9EBF8" stroke="#BCCEFD"/>
|
||||
<path d="M13.1604 8.23038L11.95 7.01994L10.1392 8.83078L8.32832 7.01994L7.11789 8.23038L8.92872 10.0412L7.12046 11.8495L8.33089 13.0599L10.1392 11.2517L11.9474 13.0599L13.1579 11.8495L11.3496 10.0412L13.1604 8.23038Z" fill="#205CF5"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 588 B |
3
ui/src/components/assets/refresh.svg
Normal file
3
ui/src/components/assets/refresh.svg
Normal file
@@ -0,0 +1,3 @@
|
||||
<svg width="26" height="26" viewBox="0 0 26 26" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path d="M10.8337 11.9167H7.69308L7.69416 11.907C7.83561 11.2143 8.11247 10.5564 8.50883 9.97105C9.09865 9.10202 9.92598 8.42097 10.8922 8.00913C11.2193 7.87046 11.5606 7.7643 11.9083 7.69388C12.6297 7.54762 13.3731 7.54762 14.0945 7.69388C15.1312 7.90631 16.0825 8.41908 16.8299 9.1683L18.3639 7.63863C17.6725 6.94707 16.8546 6.39501 15.9546 6.01255C15.4956 5.81823 15.0184 5.67016 14.53 5.57055C13.5223 5.36581 12.4838 5.36581 11.4761 5.57055C10.9873 5.67057 10.5098 5.819 10.0504 6.01363C8.69682 6.58791 7.53808 7.54123 6.71374 8.7588C6.15895 9.5798 5.77099 10.5019 5.57191 11.4725C5.54158 11.6188 5.52533 11.7683 5.50366 11.9167H2.16699L6.50033 16.25L10.8337 11.9167ZM15.167 14.0834H18.3076L18.3065 14.092C18.0234 15.4806 17.205 16.7019 16.0282 17.4915C15.443 17.8882 14.7851 18.1651 14.0923 18.3062C13.3713 18.4525 12.6283 18.4525 11.9072 18.3062C11.2146 18.1648 10.5567 17.8879 9.97133 17.4915C9.68383 17.2971 9.41541 17.0758 9.16966 16.8307L7.63783 18.3625C8.32954 19.0539 9.14791 19.6056 10.0482 19.9875C10.5076 20.1825 10.9875 20.331 11.4728 20.4295C12.4801 20.6344 13.5184 20.6344 14.5257 20.4295C16.4676 20.0265 18.1757 18.8819 19.2869 17.2391C19.8412 16.4187 20.2288 15.4974 20.4277 14.5275C20.4569 14.3813 20.4742 14.2318 20.4959 14.0834H23.8337L19.5003 9.75005L15.167 14.0834Z" fill="#205CF5"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 1.4 KiB |
15
ui/src/components/assets/serviceMap.svg
Normal file
15
ui/src/components/assets/serviceMap.svg
Normal file
@@ -0,0 +1,15 @@
|
||||
<svg width="18" height="18" viewBox="0 0 18 18" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path d="M15.2998 5.40002L8.9998 9.00002" stroke="#A0B2FF" stroke-linecap="round" stroke-linejoin="round"/>
|
||||
<path d="M15.2998 12.6L8.99976 8.99988" stroke="#A0B2FF" stroke-linecap="round" stroke-linejoin="round"/>
|
||||
<path d="M2.7002 5.40002L2.70024 12.5001" stroke="#A0B2FF" stroke-linecap="round" stroke-linejoin="round"/>
|
||||
<path d="M9 8.99988V16.2" stroke="#A0B2FF" stroke-linecap="round" stroke-linejoin="round"/>
|
||||
<path d="M2.7 5.40005L9 1.80005" stroke="#A0B2FF" stroke-linecap="round" stroke-linejoin="round"/>
|
||||
<path d="M15.2998 5.40005L8.9998 1.80005" stroke="#A0B2FF" stroke-linecap="round" stroke-linejoin="round"/>
|
||||
<path d="M4.4999 5.39995C4.4999 6.39406 3.69402 7.19995 2.6999 7.19995C1.70579 7.19995 0.899902 6.39406 0.899902 5.39995C0.899902 4.40584 1.70579 3.59995 2.6999 3.59995C3.69402 3.59995 4.4999 4.40584 4.4999 5.39995Z" fill="#205CF5"/>
|
||||
<path d="M4.4999 12.6C4.4999 13.5941 3.69402 14.4 2.6999 14.4C1.70579 14.4 0.899902 13.5941 0.899902 12.6C0.899902 11.6059 1.70579 10.8 2.6999 10.8C3.69402 10.8 4.4999 11.6059 4.4999 12.6Z" fill="#205CF5"/>
|
||||
<path d="M17.1 5.39995C17.1 6.39406 16.2941 7.19995 15.3 7.19995C14.3059 7.19995 13.5 6.39406 13.5 5.39995C13.5 4.40584 14.3059 3.59995 15.3 3.59995C16.2941 3.59995 17.1 4.40584 17.1 5.39995Z" fill="#205CF5"/>
|
||||
<path d="M17.1 12.6C17.1 13.5941 16.2941 14.4 15.3 14.4C14.3059 14.4 13.5 13.5941 13.5 12.6C13.5 11.6059 14.3059 10.8 15.3 10.8C16.2941 10.8 17.1 11.6059 17.1 12.6Z" fill="#205CF5"/>
|
||||
<path d="M10.8002 1.79998C10.8002 2.79409 9.99431 3.59998 9.0002 3.59998C8.00608 3.59998 7.2002 2.79409 7.2002 1.79998C7.2002 0.805863 8.00608 -2.45571e-05 9.0002 -2.45571e-05C9.99431 -2.45571e-05 10.8002 0.805863 10.8002 1.79998Z" fill="#205CF5"/>
|
||||
<path d="M10.8002 9.00005C10.8002 9.99416 9.99431 10.8 9.0002 10.8C8.00608 10.8 7.2002 9.99416 7.2002 9.00005C7.2002 8.00594 8.00608 7.20005 9.0002 7.20005C9.99431 7.20005 10.8002 8.00594 10.8002 9.00005Z" fill="#205CF5"/>
|
||||
<path d="M10.8002 16.2C10.8002 17.1941 9.99431 18 9.0002 18C8.00608 18 7.2002 17.1941 7.2002 16.2C7.2002 15.2059 8.00608 14.4 9.0002 14.4C9.99431 14.4 10.8002 15.2059 10.8002 16.2Z" fill="#205CF5"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 2.2 KiB |
10
ui/src/components/assets/services.svg
Normal file
10
ui/src/components/assets/services.svg
Normal file
@@ -0,0 +1,10 @@
|
||||
<svg width="30" height="30" viewBox="0 0 30 30" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<rect x="5.5" y="6.5" width="20" height="17" rx="1.5" stroke="#A0B2FF"/>
|
||||
<path d="M18.5 6.5H24C24.8284 6.5 25.5 7.17157 25.5 8V22C25.5 22.8284 24.8284 23.5 24 23.5H18.5V6.5Z" fill="#BCCEFD" stroke="#A0B2FF"/>
|
||||
<rect x="19" y="9" width="5" height="2" rx="1" fill="white"/>
|
||||
<rect x="8" y="9" width="4" height="2" rx="1" fill="#205CF5"/>
|
||||
<rect x="8" y="14" width="7" height="2" rx="1" fill="#205CF5"/>
|
||||
<rect width="3" height="2" rx="1" transform="matrix(-1 0 0 1 22 14)" fill="white"/>
|
||||
<path d="M24 20C24 19.4477 23.5523 19 23 19H19V21H23C23.5523 21 24 20.5523 24 20Z" fill="white"/>
|
||||
<rect x="8" y="19" width="7" height="2" rx="1" fill="#205CF5"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 747 B |
@@ -9,7 +9,7 @@ export const useCommonStyles = makeStyles(() => ({
|
||||
fontSize: 12,
|
||||
padding: "8px 12px",
|
||||
borderRadius: "6px ! important",
|
||||
|
||||
|
||||
"&:hover": {
|
||||
backgroundColor: "#205cf5",
|
||||
},
|
||||
@@ -27,6 +27,11 @@ export const useCommonStyles = makeStyles(() => ({
|
||||
backgroundColor: "transparent",
|
||||
},
|
||||
},
|
||||
|
||||
imagedButton: {
|
||||
padding: "0px 14px"
|
||||
},
|
||||
|
||||
textField: {
|
||||
outline: 0,
|
||||
background: "white",
|
||||
|
||||
Reference in New Issue
Block a user