Compare commits

...

20 Commits

Author SHA1 Message Date
AmitUp9
5dfa94d76e service map - reset button and function deleted (#805)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-02-15 22:57:56 +02:00
Alex Haiut
dfe63f2318 enabled service-map by default (#816) 2022-02-15 22:41:57 +02:00
Alex Haiut
9cf64a43f5 modified namespace in helm command (#814) 2022-02-15 17:14:37 +02:00
Igor Gov
bf2362d836 Update mizu install command (#811) 2022-02-15 16:05:28 +02:00
Igor Gov
1c11523d9d View command - no version check (#810) 2022-02-15 15:49:44 +02:00
Nimrod Gilboa Markevich
150a87413d Print http error response details (#792) 2022-02-15 12:29:34 +02:00
Igor Gov
f7221a7355 Sending telemetry config to server (#808) 2022-02-15 11:08:16 +02:00
RamiBerm
4197ec198c Tag entries destination namespace (#803)
* Update main.go, socket_server_handlers.go, and 7 more files...

* Update cors.go

* omit empty namespace in json so tests pass

* fix indent
2022-02-15 10:51:38 +02:00
Nimrod Gilboa Markevich
5484b7c491 Force DaemonSet apply (#804)
Required for apply to work if the DaemonSet is created by another program e.g. Helm.
2022-02-15 10:16:24 +02:00
M. Mert Yıldıran
041223b558 Move the request-response matcher's scope from global-level to TCP stream-level (#793)
* Create a new request-response matcher for each TCP stream

* Fix the `ident` formats in request-response matchers

* Don't sort the items in the HTTP tests

* Update tap/extensions/kafka/matcher.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Temporarily change the bucket folder to the new expected

* Bring back the `deleteOlderThan` method

* Use `api.RequestResponseMatcher` instead of `interface{}` as type

* Use `api.RequestResponseMatcher` instead of `interface{}` as type (more)

* Update the key format comments

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-02-14 18:16:58 +03:00
Igor Gov
71c04d20ef Support rancher client config (#802) 2022-02-14 16:48:33 +02:00
Igor Gov
8852bac77b Adding logs of k8s client config (#800) 2022-02-14 06:26:57 +02:00
Alex Haiut
59d21e19b7 moved CHANGELOG to Mizu wiki in Github (#801) 2022-02-14 00:16:33 +02:00
AmitUp9
884cb791fc ws pause after buttons click (#798)
* ws pause after buttons click

* name change

Co-authored-by: Igor Gov <iggvrv@gmail.com>
2022-02-13 17:03:33 +02:00
M. Mert Yıldıran
cb332cedd4 Upgrade Basenine version to v0.4.16 (#796) 2022-02-13 06:22:04 +02:00
Igor Gov
391af95fb5 Fix: tapper count check (#791) 2022-02-10 17:00:13 +02:00
RoyUP9
9e62eaf4de Fixed view port (#790) 2022-02-10 16:17:09 +02:00
Igor Gov
81e830dd18 Check that API server and tappers are running in check cmd (#789)
* Check if API server and tapper are running in check cmd
2022-02-10 16:00:33 +02:00
Igor Gov
88e3fba1ef Fix: tapper status race condition (#788) 2022-02-10 12:13:40 +02:00
Gustavo Massaneiro
a4066da5d7 TRA-4259 Service Map UI improvements (#753)
Co-authored-by: Amit Fainholts <amit@up9.com>
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: Igor Gov <iggvrv@gmail.com>
2022-02-10 11:22:07 +02:00
48 changed files with 588 additions and 449 deletions

View File

@@ -78,8 +78,8 @@ RUN go build -ldflags="-extldflags=-static -s -w \
-X 'github.com/up9inc/mizu/agent/pkg/version.Ver=${VER}'" -o mizuagent .
# Download Basenine executable, verify the sha1sum
ADD https://github.com/up9inc/basenine/releases/download/v0.4.14/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
ADD https://github.com/up9inc/basenine/releases/download/v0.4.14/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.4.16/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
ADD https://github.com/up9inc/basenine/releases/download/v0.4.16/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
RUN shasum -a 256 -c basenine_linux_${GOARCH}.sha256
RUN chmod +x ./basenine_linux_${GOARCH}
RUN mv ./basenine_linux_${GOARCH} ./basenine

View File

@@ -118,8 +118,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
for item := range outputItems {
extension := extensionsMap[item.Protocol.Name]
resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation)
resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace)
if extension.Protocol.Name == "http" {
if !disableOASValidation {
var httpPair tapApi.HTTPRequestResponsePair
@@ -158,26 +158,32 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
}
}
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string) {
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string, namespace string) {
if k8sResolver != nil {
unresolvedSource := connectionInfo.ClientIP
resolvedSource = k8sResolver.Resolve(unresolvedSource)
if resolvedSource == "" {
resolvedSourceObject := k8sResolver.Resolve(unresolvedSource)
if resolvedSourceObject == nil {
logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource)
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
return
}
} else {
resolvedSource = resolvedSourceObject.FullAddress
}
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
if resolvedDestination == "" {
resolvedDestinationObject := k8sResolver.Resolve(unresolvedDestination)
if resolvedDestinationObject == nil {
logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination)
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
return
}
} else {
resolvedDestination = resolvedDestinationObject.FullAddress
namespace = resolvedDestinationObject.Namespace
}
}
return resolvedSource, resolvedDestination
return resolvedSource, resolvedDestination, namespace
}
func CheckIsServiceIP(address string) bool {

View File

@@ -34,9 +34,12 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
tappers.Connected()
} else {
logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
socketListLock.Lock()
browserClientSocketUUIDs = append(browserClientSocketUUIDs, socketId)
socketListLock.Unlock()
BroadcastTappedPodsStatus()
}
}
@@ -101,9 +104,9 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
}
func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
resolvedName := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
if resolvedName != "" {
outboundLinkMessage.Data.DstIP = resolvedName
resolvedNameObject := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
if resolvedNameObject != nil {
outboundLinkMessage.Data.DstIP = resolvedNameObject.FullAddress
} else if outboundLinkMessage.Data.SuggestedResolvedName != "" {
outboundLinkMessage.Data.DstIP = outboundLinkMessage.Data.SuggestedResolvedName
}

20
agent/pkg/api/utils.go Normal file
View File

@@ -0,0 +1,20 @@
package api
import (
"encoding/json"
"github.com/up9inc/mizu/agent/pkg/providers/tappedPods"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
func BroadcastTappedPodsStatus() {
tappedPodsStatus := tappedPods.GetTappedPodsStatus()
message := shared.CreateWebSocketStatusMessage(tappedPodsStatus)
if jsonBytes, err := json.Marshal(message); err != nil {
logger.Log.Errorf("Could not Marshal message %v", err)
} else {
BroadcastToBrowserClients(jsonBytes)
}
}

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/api"
"github.com/up9inc/mizu/agent/pkg/config"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/providers"
@@ -36,7 +37,7 @@ func PostTapConfig(c *gin.Context) {
tappedPods.Set([]*shared.PodInfo{})
tappers.ResetStatus()
broadcastTappedPodsStatus()
api.BroadcastTappedPodsStatus()
}
var tappedNamespaces []string
@@ -135,7 +136,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
}
tappedPods.Set(kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods))
broadcastTappedPodsStatus()
api.BroadcastTappedPodsStatus()
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
if !ok {
logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop")
@@ -143,7 +144,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
}
tappers.SetStatus(&tapperStatus)
broadcastTappedPodsStatus()
api.BroadcastTappedPodsStatus()
case <-ctx.Done():
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
return

View File

@@ -1,7 +1,6 @@
package controllers
import (
"encoding/json"
"net/http"
"github.com/gin-gonic/gin"
@@ -39,18 +38,7 @@ func PostTappedPods(c *gin.Context) {
logger.Log.Infof("[Status] POST request: %d tapped pods", len(requestTappedPods))
tappedPods.Set(requestTappedPods)
broadcastTappedPodsStatus()
}
func broadcastTappedPodsStatus() {
tappedPodsStatus := tappedPods.GetTappedPodsStatus()
message := shared.CreateWebSocketStatusMessage(tappedPodsStatus)
if jsonBytes, err := json.Marshal(message); err != nil {
logger.Log.Errorf("Could not Marshal message %v", err)
} else {
api.BroadcastToBrowserClients(jsonBytes)
}
api.BroadcastTappedPodsStatus()
}
func PostTapperStatus(c *gin.Context) {
@@ -67,7 +55,7 @@ func PostTapperStatus(c *gin.Context) {
logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus)
tappers.SetStatus(tapperStatus)
broadcastTappedPodsStatus()
api.BroadcastTappedPodsStatus()
}
func GetConnectedTappersCount(c *gin.Context) {

View File

@@ -7,7 +7,7 @@ func CORSMiddleware() gin.HandlerFunc {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With, x-session-token")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT, DELETE")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(204)

View File

@@ -30,6 +30,11 @@ type Resolver struct {
namespace string
}
type ResolvedObjectInfo struct {
FullAddress string
Namespace string
}
func (resolver *Resolver) Start(ctx context.Context) {
if !resolver.isStarted {
resolver.isStarted = true
@@ -40,12 +45,12 @@ func (resolver *Resolver) Start(ctx context.Context) {
}
}
func (resolver *Resolver) Resolve(name string) string {
func (resolver *Resolver) Resolve(name string) *ResolvedObjectInfo {
resolvedName, isFound := resolver.nameMap.Get(name)
if !isFound {
return ""
return nil
}
return resolvedName.(string)
return resolvedName.(*ResolvedObjectInfo)
}
func (resolver *Resolver) GetMap() cmap.ConcurrentMap {
@@ -71,7 +76,7 @@ func (resolver *Resolver) watchPods(ctx context.Context) error {
}
if event.Type == watch.Deleted {
pod := event.Object.(*corev1.Pod)
resolver.saveResolvedName(pod.Status.PodIP, "", event.Type)
resolver.saveResolvedName(pod.Status.PodIP, "", pod.Namespace, event.Type)
}
case <-ctx.Done():
watcher.Stop()
@@ -106,10 +111,10 @@ func (resolver *Resolver) watchEndpoints(ctx context.Context) error {
}
if subset.Addresses != nil {
for _, address := range subset.Addresses {
resolver.saveResolvedName(address.IP, serviceHostname, event.Type)
resolver.saveResolvedName(address.IP, serviceHostname, endpoint.Namespace, event.Type)
for _, port := range ports {
ipWithPort := fmt.Sprintf("%s:%d", address.IP, port)
resolver.saveResolvedName(ipWithPort, serviceHostname, event.Type)
resolver.saveResolvedName(ipWithPort, serviceHostname, endpoint.Namespace, event.Type)
}
}
}
@@ -139,19 +144,19 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
service := event.Object.(*corev1.Service)
serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace)
if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString {
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type)
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
if service.Spec.Ports != nil {
for _, port := range service.Spec.Ports {
if port.Port > 0 {
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, event.Type)
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, service.Namespace, event.Type)
}
}
}
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type)
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
}
if service.Status.LoadBalancer.Ingress != nil {
for _, ingress := range service.Status.LoadBalancer.Ingress {
resolver.saveResolvedName(ingress.IP, serviceHostname, event.Type)
resolver.saveResolvedName(ingress.IP, serviceHostname, service.Namespace, event.Type)
}
}
case <-ctx.Done():
@@ -161,21 +166,22 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
}
}
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
func (resolver *Resolver) saveResolvedName(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.nameMap.Remove(key)
logger.Log.Infof("setting %s=nil", key)
} else {
resolver.nameMap.Set(key, resolved)
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
logger.Log.Infof("setting %s=%s", key, resolved)
}
}
func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) {
func (resolver *Resolver) saveServiceIP(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.serviceMap.Remove(key)
} else {
resolver.serviceMap.Set(key, resolved)
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
}
}

View File

@@ -1,5 +1,5 @@
# Mizu release _VER_
Full changelog for stable release see in [docs](https://github.com/up9inc/mizu/blob/main/docs/CHANGELOG.md)
Mizu CHANGELOG is now part of [Mizu wiki](https://github.com/up9inc/mizu/wiki/CHANGELOG)
## Download Mizu for your platform

View File

@@ -4,9 +4,11 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/up9inc/mizu/shared/kubernetes"
@@ -57,10 +59,8 @@ func (provider *Provider) TestConnection() error {
func (provider *Provider) isReachable() (bool, error) {
echoUrl := fmt.Sprintf("%s/echo", provider.url)
if response, err := provider.client.Get(echoUrl); err != nil {
if _, err := provider.get(echoUrl); err != nil {
return false, err
} else if response.StatusCode != 200 {
return false, fmt.Errorf("invalid status code %v", response.StatusCode)
} else {
return true, nil
}
@@ -72,10 +72,8 @@ func (provider *Provider) ReportTapperStatus(tapperStatus shared.TapperStatus) e
if jsonValue, err := json.Marshal(tapperStatus); err != nil {
return fmt.Errorf("failed Marshal the tapper status %w", err)
} else {
if response, err := provider.client.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
if _, err := provider.post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else if response.StatusCode != 200 {
return fmt.Errorf("failed sending to API server the tapper status, response status code %v", response.StatusCode)
} else {
logger.Log.Debugf("Reported to server API about tapper status: %v", tapperStatus)
return nil
@@ -91,10 +89,8 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
if jsonValue, err := json.Marshal(podInfos); err != nil {
return fmt.Errorf("failed Marshal the tapped pods %w", err)
} else {
if response, err := provider.client.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
if _, err := provider.post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else if response.StatusCode != 200 {
return fmt.Errorf("failed sending to API server the tapped pods, response status code %v", response.StatusCode)
} else {
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
return nil
@@ -105,11 +101,9 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
func (provider *Provider) GetGeneralStats() (map[string]interface{}, error) {
generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url)
response, requestErr := provider.client.Get(generalStatsUrl)
response, requestErr := provider.get(generalStatsUrl)
if requestErr != nil {
return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr)
} else if response.StatusCode != 200 {
return nil, fmt.Errorf("failed to get general stats for telemetry, status code: %v", response.StatusCode)
}
defer response.Body.Close()
@@ -132,7 +126,7 @@ func (provider *Provider) GetVersion() (string, error) {
Method: http.MethodGet,
URL: versionUrl,
}
statusResp, err := provider.client.Do(req)
statusResp, err := provider.do(req)
if err != nil {
return "", err
}
@@ -145,3 +139,40 @@ func (provider *Provider) GetVersion() (string, error) {
return versionResponse.Ver, nil
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) get(url string) (*http.Response, error) {
return provider.checkError(provider.client.Get(url))
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) post(url, contentType string, body io.Reader) (*http.Response, error) {
return provider.checkError(provider.client.Post(url, contentType, body))
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) do(req *http.Request) (*http.Response, error) {
return provider.checkError(provider.client.Do(req))
}
func (provider *Provider) checkError(response *http.Response, errInOperation error) (*http.Response, error) {
if (errInOperation != nil) {
return response, errInOperation
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
} else if response.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(response.Body)
response.Body.Close()
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
if err != nil {
return response, err
}
errorMsg := strings.ReplaceAll((string(body)), "\n", ";")
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
}
return response, nil
}

View File

@@ -31,7 +31,7 @@ func runMizuCheck() {
}
if checkPassed {
checkPassed = checkAllResourcesExist(ctx, kubernetesProvider, isInstallCommand)
checkPassed = checkK8sResources(ctx, kubernetesProvider, isInstallCommand)
}
if checkPassed {
@@ -66,7 +66,7 @@ func checkKubernetesApi() (*kubernetes.Provider, *semver.SemVersion, bool) {
}
func checkMizuMode(ctx context.Context, kubernetesProvider *kubernetes.Provider) (bool, bool) {
logger.Log.Infof("\nmizu-mode\n--------------------")
logger.Log.Infof("\nmode\n--------------------")
if exist, err := kubernetesProvider.DoesDeploymentExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
logger.Log.Errorf("%v can't check mizu command, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
@@ -79,7 +79,7 @@ func checkMizuMode(ctx context.Context, kubernetesProvider *kubernetes.Provider)
return false, false
} else if exist {
logger.Log.Infof("%v mizu running with tap command", fmt.Sprintf(uiUtils.Green, "√"))
return true, true
return true, false
} else {
logger.Log.Infof("%v mizu is not running", fmt.Sprintf(uiUtils.Red, "✗"))
return false, false
@@ -99,9 +99,9 @@ func checkKubernetesVersion(kubernetesVersion *semver.SemVersion) bool {
}
func checkServerConnection(kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nmizu-connectivity\n--------------------")
logger.Log.Infof("\nAPI-server-connectivity\n--------------------")
serverUrl := GetApiServerUrl()
serverUrl := GetApiServerUrl(config.Config.Tap.GuiPort)
apiServerProvider := apiserver.NewProvider(serverUrl, 1, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err == nil {
@@ -169,8 +169,8 @@ func checkPortForward(serverUrl string, kubernetesProvider *kubernetes.Provider)
return nil
}
func checkAllResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.Provider, isInstallCommand bool) bool {
logger.Log.Infof("\nmizu-existence\n--------------------")
func checkK8sResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isInstallCommand bool) bool {
logger.Log.Infof("\nk8s-components\n--------------------")
exist, err := kubernetesProvider.DoesNamespaceExist(ctx, config.Config.MizuResourcesNamespace)
allResourcesExist := checkResourceExist(config.Config.MizuResourcesNamespace, "namespace", exist, err)
@@ -227,7 +227,43 @@ func checkTapResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.
exist, err := kubernetesProvider.DoesPodExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName)
tapResourcesExist := checkResourceExist(kubernetes.ApiServerPodName, "pod", exist, err)
return tapResourcesExist
if !tapResourcesExist {
return false
}
if pod, err := kubernetesProvider.GetPod(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
logger.Log.Errorf("%v error checking if '%v' pod exists, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName, err)
return false
} else if kubernetes.IsPodRunning(pod) {
logger.Log.Infof("%v '%v' pod running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.ApiServerPodName)
} else {
logger.Log.Errorf("%v '%v' pod not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName)
return false
}
tapperRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperPodName))
if pods, err := kubernetesProvider.ListAllPodsMatchingRegex(ctx, tapperRegex, []string{config.Config.MizuResourcesNamespace}); err != nil {
logger.Log.Errorf("%v error listing '%v' pods, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, err)
return false
} else {
tappers := 0
notRunningTappers := 0
for _, pod := range pods {
tappers += 1
if !kubernetes.IsPodRunning(&pod) {
notRunningTappers += 1
}
}
if notRunningTappers > 0 {
logger.Log.Errorf("%v '%v' %v/%v pods are not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, notRunningTappers, tappers)
return false
}
logger.Log.Infof("%v '%v' %v pods running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.TapperPodName, tappers)
return true
}
}
func checkResourceExist(resourceName string, resourceType string, exist bool, err error) bool {

View File

@@ -23,12 +23,12 @@ import (
"github.com/up9inc/mizu/shared/logger"
)
func GetApiServerUrl() string {
return fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort))
func GetApiServerUrl(port uint16) string {
return fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(port))
}
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc) {
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel)
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc, port uint16) {
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, port, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
@@ -36,7 +36,7 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
return
}
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiProvider.TestConnection(); err != nil {
logger.Log.Debugf("Couldn't connect using proxy, stopping proxy and trying to create port-forward")
if err := httpServer.Shutdown(ctx); err != nil {
@@ -44,14 +44,14 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
}
podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName)
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, config.Config.Tap.GuiPort, ctx, cancel); err != nil {
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, port, ctx, cancel); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running port forward %v\n"+
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
cancel()
return
}
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiProvider.TestConnection(); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()

View File

@@ -1,10 +1,9 @@
package cmd
import (
"fmt"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger"
)
var installCmd = &cobra.Command{
@@ -12,13 +11,13 @@ var installCmd = &cobra.Command{
Short: "Installs mizu components",
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("install", nil)
runMizuInstall()
return nil
},
PreRunE: func(cmd *cobra.Command, args []string) error {
if config.Config.IsNsRestrictedMode() {
return fmt.Errorf("install is not supported in restricted namespace mode")
}
logger.Log.Infof("This command has been deprecated, please use helm as described below.\n\n")
logger.Log.Infof("To install stable build of Mizu on your cluster using helm, run the following command:")
logger.Log.Infof(" helm install mizu https://static.up9.com/mizu/helm --namespace=mizu --create-namespace\n\n")
logger.Log.Infof("To install development build of Mizu on your cluster using helm, run the following command:")
logger.Log.Infof(" helm install mizu https://static.up9.com/mizu/helm-develop --namespace=mizu --create-namespace")
return nil
},
@@ -27,4 +26,3 @@ var installCmd = &cobra.Command{
func init() {
rootCmd.AddCommand(installCmd)
}

View File

@@ -1,81 +0,0 @@
package cmd
import (
"context"
"errors"
"fmt"
"github.com/creasty/defaults"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/resources"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func runMizuInstall() {
kubernetesProvider, err := getKubernetesProviderForCli()
if err != nil {
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
var serializedValidationRules string
var serializedContract string
var defaultMaxEntriesDBSizeBytes int64 = 200 * 1000 * 1000
defaultResources := shared.Resources{}
if err := defaults.Set(&defaultResources); err != nil {
logger.Log.Debug(err)
}
mizuAgentConfig := getInstallMizuAgentConfig(defaultMaxEntriesDBSizeBytes, defaultResources)
serializedMizuConfig, err := getSerializedMizuAgentConfig(mizuAgentConfig)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error serializing mizu config: %v", errormessage.FormatError(err)))
return
}
if err = resources.CreateInstallMizuResources(ctx, kubernetesProvider, serializedValidationRules,
serializedContract, serializedMizuConfig, config.Config.IsNsRestrictedMode(),
config.Config.MizuResourcesNamespace, config.Config.AgentImage,
config.Config.KratosImage, config.Config.KetoImage,
nil, defaultMaxEntriesDBSizeBytes, defaultResources, config.Config.ImagePullPolicy(),
config.Config.LogLevel(), false); err != nil {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
logger.Log.Info("Mizu is already running in this namespace, run `mizu clean` to remove the currently running Mizu instance")
} else {
defer resources.CleanUpMizuResources(ctx, cancel, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.MizuResourcesNamespace)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
}
return
}
logger.Log.Infof(uiUtils.Magenta, "Installation completed, run `mizu view` to connect to the mizu daemon instance")
}
func getInstallMizuAgentConfig(maxDBSizeBytes int64, tapperResources shared.Resources) *shared.MizuAgentConfig {
mizuAgentConfig := shared.MizuAgentConfig{
MaxDBSizeBytes: maxDBSizeBytes,
AgentImage: config.Config.AgentImage,
PullPolicy: config.Config.ImagePullPolicyStr,
LogLevel: config.Config.LogLevel(),
TapperResources: tapperResources,
MizuResourcesNamespace: config.Config.MizuResourcesNamespace,
AgentDatabasePath: shared.DataDirPath,
StandaloneMode: true,
ServiceMap: config.Config.ServiceMap,
OAS: config.Config.OAS,
Elastic: config.Config.Elastic,
}
return &mizuAgentConfig
}

View File

@@ -45,7 +45,7 @@ var apiProvider *apiserver.Provider
func RunMizuTap() {
state.startTime = time.Now()
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
apiProvider = apiserver.NewProvider(GetApiServerUrl(config.Config.Tap.GuiPort), apiserver.DefaultRetries, apiserver.DefaultTimeout)
var err error
var serializedValidationRules string
@@ -162,6 +162,7 @@ func getTapMizuAgentConfig() *shared.MizuAgentConfig {
AgentDatabasePath: shared.DataDirPath,
ServiceMap: config.Config.ServiceMap,
OAS: config.Config.OAS,
Telemetry: config.Config.Telemetry,
Elastic: config.Config.Elastic,
}
@@ -421,7 +422,7 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr
}
func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel)
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, config.Config.Tap.GuiPort)
options, _ := getMizuApiFilteringOptions()
if err := startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, *options, state.startTime); err != nil {
@@ -429,7 +430,7 @@ func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Pr
cancel()
}
url := GetApiServerUrl()
url := GetApiServerUrl(config.Config.Tap.GuiPort)
logger.Log.Infof("Mizu is available at %s", url)
if !config.Config.HeadlessMode {
uiUtils.OpenBrowser(url)

View File

@@ -3,13 +3,13 @@ package cmd
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/utils"
"net/http"
"github.com/up9inc/mizu/cli/utils"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
@@ -39,7 +39,7 @@ func runMizuView() {
return
}
url = GetApiServerUrl()
url = GetApiServerUrl(config.Config.View.GuiPort)
response, err := http.Get(fmt.Sprintf("%s/", url))
if err == nil && response.StatusCode == 200 {
@@ -47,7 +47,7 @@ func runMizuView() {
return
}
logger.Log.Infof("Establishing connection to k8s cluster...")
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel)
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, config.Config.View.GuiPort)
}
apiServerProvider := apiserver.NewProvider(url, apiserver.DefaultRetries, apiserver.DefaultTimeout)
@@ -62,14 +62,5 @@ func runMizuView() {
uiUtils.OpenBrowser(url)
}
if isCompatible, err := version.CheckVersionCompatibility(apiServerProvider); err != nil {
logger.Log.Errorf("Failed to check versions compatibility %v", err)
cancel()
return
} else if !isCompatible {
cancel()
return
}
utils.WaitForFinish(ctx, cancel)
}

View File

@@ -38,7 +38,7 @@ type ConfigStruct struct {
ConfigFilePath string `yaml:"config-path,omitempty" readonly:""`
HeadlessMode bool `yaml:"headless" default:"false"`
LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""`
ServiceMap bool `yaml:"service-map" default:"false"`
ServiceMap bool `yaml:"service-map" default:"true"`
OAS bool `yaml:"oas,omitempty" default:"false" readonly:""`
Elastic shared.ElasticConfig `yaml:"elastic"`
}

View File

@@ -1,72 +1 @@
# CHANGELOG
This document summarizes main and fixes changes published in stable (aka `main`) branch of this project.
Ongoing work and development releases are under `develop` branch.
## 0.24.0
### main features
* ARM64 support -- Mizu is now available for ARM 64bit architecture
* Now you can run Mizu with `minikube` on your Apple M1 laptop or any other ARM-based hosts
* New command helps user verify Mizu deployment
* Run `mizu check` to verify Mizu was deployed successfully
* `mizu check` verifies version compatibility, resources and permissions required by Mizu
* EXPERIMENTAL: Service Map - graph of all service interactions
* Arrow direction show client to server connection
* Graph edge width reflects volume of traffic captured between the services
* to enable this experimental feature use `--set service-map=true` flag
### improvements
* Mizu container images are now served from [Docker Hub](https://hub.docker.com/r/up9inc/mizu), as multi-architecture images (arm64, amd64)
* in Mizu GUI the filter query can now be applied by pressing CONTROL/COMMAND + ENTER
* try port-forwarding if http-proxy connection to Mizu API server is not available
### notable bug fixes
* Fixed HTTP/1.0 presentation which was shown as HTTP/1.1
* Fixed handling of long-living TCP connections, improves capturing gRPC and HTTP/2 traffic, and helps in service-mesh setups (istio, linkerd)
## 0.23.0
### notable bug fixes
* fixed errors in Redis protocol parser (better handling of Array and Bulk String message types)
## 0.22.0
### main features
* Service Mesh support -- mizu is now capable to tap mTLS traffic between pods connected by Istio service mesh
* Use `--service-mesh` option to enable this feature
* New installation option - have the same Mizu functionality as long living pods in your cluster, with password protection
* To install use `mizu install` command
* To access use `mizu view` or `kubectl -n mizu port-forward svc/mizu-api-server`
* To uninstall run `mizu clean`
* At first login
* Set admin password as prompted, use it to login to mizu later on.
* After login, user should select cluster namespaces to tap: by default all namespaces in the cluster are selected, user can select/unselect according to their needs. These settings are retained and can be modified at any time via Settings menu (cog icon on the top-right)
### improvements
* improved Mizu permissions/roles logic to support clusters with strict PodSecurityPolicy (PSP) -- see [PERMISSIONS](PERMISSIONS.md) doc for more details
### notable bug fixes
* mizu now works properly when API service is exposed via HTTPS url
* mizu now properly displays KAFKA message body
## 0.21.0
### main features
* New traffic search & stream exprience
* Rich query language with full-text search capabilities on headers & body
* Distinct live-streaming vs paging/browsing modes, all with filter applied
### improvements
* GUI - source and destination IP addresses & service names for each traffic item
* GUI - Mizu health - display warning sign in top bar when not all requested pods are successfully tapped
* GUI - pod tapping status reflected in the list (ok or problem)
* Mizu telemetry - report platform type
### fixes
* Request duration and body size properly shown in GUI (instead of -1)
Mizu CHANGELOG is now part of [Mizu wiki](https://github.com/up9inc/mizu/wiki/CHANGELOG)

View File

@@ -76,6 +76,8 @@ func NewProvider(kubeConfigPath string) (*Provider, error) {
"you can set alternative kube config file path by adding the kube-config-path field to the mizu config file, err: %w", kubeConfigPath, err)
}
logger.Log.Debugf("K8s client config, host: %s, api path: %s, user agent: %s", restClientConfig.Host, restClientConfig.APIPath, restClientConfig.UserAgent)
return &Provider{
clientSet: clientSet,
kubernetesConfig: kubernetesConfig,
@@ -952,6 +954,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
labelSelector := applyconfmeta.LabelSelector()
labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName})
applyOptions := metav1.ApplyOptions{
Force: true,
FieldManager: fieldManagerName,
}
daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace)
daemonSet.
WithLabels(map[string]string{
@@ -960,7 +967,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
}).
WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate))
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, metav1.ApplyOptions{FieldManager: fieldManagerName})
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions)
return err
}
@@ -1000,7 +1007,7 @@ func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, r
matchingPods := make([]core.Pod, 0)
for _, pod := range pods {
if isPodRunning(&pod) {
if IsPodRunning(&pod) {
matchingPods = append(matchingPods, pod)
}
}
@@ -1190,6 +1197,6 @@ func loadKubernetesConfiguration(kubeConfigPath string) clientcmd.ClientConfig {
)
}
func isPodRunning(pod *core.Pod) bool {
func IsPodRunning(pod *core.Pod) bool {
return pod.Status.Phase == core.PodRunning
}

View File

@@ -128,9 +128,14 @@ func getHttpDialer(kubernetesProvider *Provider, namespace string, podName strin
return nil, err
}
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName)
hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/") // no need specify "t" twice
serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}
clientConfigHostUrl, err := url.Parse(kubernetesProvider.clientConfig.Host)
if err != nil {
return nil, fmt.Errorf("Failed parsing client config host URL %s, error %w", kubernetesProvider.clientConfig.Host, err)
}
path := fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/portforward", clientConfigHostUrl.Path, namespace, podName)
serverURL := url.URL{Scheme: "https", Path: path, Host: clientConfigHostUrl.Host}
logger.Log.Debugf("Http dialer url %v", serverURL)
return spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL), nil
}

View File

@@ -43,6 +43,7 @@ type MizuAgentConfig struct {
StandaloneMode bool `json:"standaloneMode"`
ServiceMap bool `json:"serviceMap"`
OAS bool `json:"oas"`
Telemetry bool `json:"telemetry"`
Elastic ElasticConfig `json:"elastic"`
}

View File

@@ -39,10 +39,9 @@ type TCP struct {
}
type Extension struct {
Protocol *Protocol
Path string
Dissector Dissector
MatcherMap *sync.Map
Protocol *Protocol
Path string
Dissector Dissector
}
type ConnectionInfo struct {
@@ -62,7 +61,6 @@ type TcpID struct {
}
type CounterPair struct {
StreamId int64
Request uint
Response uint
sync.Mutex
@@ -100,10 +98,15 @@ type SuperIdentifier struct {
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *Entry
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
Macros() map[string]string
NewResponseRequestMatcher() RequestResponseMatcher
}
type RequestResponseMatcher interface {
GetMap() *sync.Map
}
type Emitting struct {
@@ -125,6 +128,7 @@ type Entry struct {
Protocol Protocol `json:"proto"`
Source *TCP `json:"src"`
Destination *TCP `json:"dst"`
Namespace string `json:"namespace,omitempty"`
Outgoing bool `json:"outgoing"`
Timestamp int64 `json:"timestamp"`
StartTime time.Time `json:"startTime"`

View File

@@ -22,6 +22,7 @@ type Cleaner struct {
connectionTimeout time.Duration
stats CleanerStats
statsMutex sync.Mutex
streamsMap *tcpStreamMap
}
func (cl *Cleaner) clean() {
@@ -32,10 +33,15 @@ func (cl *Cleaner) clean() {
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.assemblerMutex.Unlock()
for _, extension := range extensions {
deleted := deleteOlderThan(extension.MatcherMap, startCleanTime.Add(-cl.connectionTimeout))
cl.streamsMap.streams.Range(func(k, v interface{}) bool {
reqResMatcher := v.(*tcpStreamWrapper).reqResMatcher
if reqResMatcher == nil {
return true
}
deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout))
cl.stats.deleted += deleted
}
return true
})
cl.statsMutex.Lock()
logger.Log.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())

View File

@@ -42,7 +42,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
r := AmqpReader{b}
var remaining int
@@ -212,7 +212,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
@@ -254,6 +254,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Method: request["method"].(string),
@@ -300,6 +301,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return nil
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/http/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect2/http/\* expect

View File

@@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
item.ConnectionInfo.ClientPort = ""
}
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil {
return err
@@ -58,7 +58,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
switch messageHTTP1 := messageHTTP1.(type) {
case http.Request:
ident := fmt.Sprintf(
"%s->%s %s->%s %d %s",
"%s_%s_%s_%s_%d_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@@ -78,7 +78,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
}
case http.Response:
ident := fmt.Sprintf(
"%s->%s %s->%s %d %s",
"%s_%s_%s_%s_%d_%s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
@@ -110,7 +110,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b)
if err != nil {
return
@@ -130,8 +130,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d_%s",
counterPair.StreamId,
"%s_%s_%s_%s_%d_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@@ -153,7 +152,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
return
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, err error) {
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response
res, err = http.ReadResponse(b, nil)
if err != nil {
@@ -174,8 +173,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d_%s",
counterPair.StreamId,
"%s_%s_%s_%s_%d_%s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,

View File

@@ -84,14 +84,15 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &http11protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
var err error
isHTTP2, _ := checkIsHTTP2Connection(b, isClient)
@@ -124,7 +125,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options)
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -133,7 +134,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
superIdentifier.Protocol = &http11protocol
} else if isClient {
var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -144,7 +145,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
if switchingProtocolsHTTP2 {
ident := fmt.Sprintf(
"%s->%s %s->%s 1 %s",
"%s_%s_%s_%s_1_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@@ -164,7 +165,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
} else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -181,7 +182,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
return nil
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
var host, authority, path string
request := item.Pair.Request.Payload.(map[string]interface{})
@@ -279,6 +280,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
@@ -472,6 +474,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@@ -11,7 +11,6 @@ import (
"os"
"path"
"path/filepath"
"sort"
"testing"
"time"
@@ -39,7 +38,6 @@ func TestRegister(t *testing.T) {
extension := &api.Extension{}
dissector.Register(extension)
assert.Equal(t, "http", extension.Protocol.Name)
assert.NotNil(t, extension.MatcherMap)
}
func TestMacros(t *testing.T) {
@@ -123,7 +121,8 @@ func TestDissect(t *testing.T) {
SrcPort: "1",
DstPort: "2",
}
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options)
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -141,7 +140,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options)
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -155,14 +154,6 @@ func TestDissect(t *testing.T) {
stop <- true
sort.Slice(items, func(i, j int) bool {
iMarshaled, err := json.Marshal(items[i])
assert.Nil(t, err)
jMarshaled, err := json.Marshal(items[j])
assert.Nil(t, err)
return len(iMarshaled) < len(jMarshaled)
})
marshaled, err := json.Marshal(items)
assert.Nil(t, err)
@@ -214,7 +205,7 @@ func TestAnalyze(t *testing.T) {
var entries []*api.Entry
for _, item := range items {
entry := dissector.Analyze(item, "", "")
entry := dissector.Analyze(item, "", "", "")
entries = append(entries, entry)
}

View File

@@ -8,16 +8,17 @@ import (
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = createResponseRequestMatcher() // global
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{incremental_counter}_{proto_ident}
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {

View File

@@ -33,27 +33,27 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &_protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
return errors.New("Identified by another protocol")
}
if isClient {
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer)
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer, reqResMatcher)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
} else {
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter)
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter, reqResMatcher)
if err != nil {
return err
}
@@ -62,7 +62,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
apiKey := ApiKey(reqDetails["apiKey"].(float64))
@@ -158,6 +158,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
@@ -215,6 +216,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@@ -3,9 +3,10 @@ package kafka
import (
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = CreateResponseRequestMatcher() // global
const maxTry int = 3000
type RequestResponsePair struct {
@@ -13,14 +14,17 @@ type RequestResponsePair struct {
Response Response
}
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}::{correlation_id}
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{correlation_id}
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func CreateResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair {

View File

@@ -19,7 +19,7 @@ type Request struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -214,8 +214,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
}
key := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.SrcIP,
tcpID.SrcPort,
tcpID.DstIP,

View File

@@ -16,7 +16,7 @@ type Response struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -44,8 +44,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s
}
key := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.DstIP,
tcpID.DstPort,
tcpID.SrcIP,

View File

@@ -6,15 +6,14 @@ import (
"github.com/up9inc/mizu/tap/api"
)
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
counterPair.Unlock()
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@@ -36,15 +35,14 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
return nil
}
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
counterPair.Unlock()
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,

View File

@@ -32,14 +32,14 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
is := &RedisInputStream{
Reader: b,
Buf: make([]byte, 8192),
@@ -52,9 +52,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isClient {
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket)
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
} else {
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket)
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
}
if err != nil {
@@ -63,7 +63,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
response := item.Pair.Response.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
@@ -96,6 +96,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
@@ -127,6 +128,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@@ -7,16 +7,17 @@ import (
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = createResponseRequestMatcher() // global
// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}`
// Key is `{src_ip}_{dst_ip}_{src_ip}_{src_port}_{incremental_counter}`
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {

View File

@@ -210,6 +210,7 @@ func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem)
assemblerMutex: &assembler.assemblerMutex,
cleanPeriod: cleanPeriod,
connectionTimeout: staleConnectionTimeout,
streamsMap: streamsMap,
}
cleaner.start()

View File

@@ -47,6 +47,7 @@ type tcpReader struct {
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
@@ -94,7 +95,7 @@ func (h *tcpReader) Close() {
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
if err != nil {
_, err = io.Copy(ioutil.Discard, b)
if err != nil {

View File

@@ -29,8 +29,9 @@ type tcpStreamFactory struct {
}
type tcpStreamWrapper struct {
stream *tcpStream
createdAt time.Time
stream *tcpStream
reqResMatcher api.RequestResponseMatcher
createdAt time.Time
}
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory {
@@ -81,8 +82,8 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
if stream.isTapTarget {
stream.id = factory.streamsMap.nextId()
for i, extension := range extensions {
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
counterPair := &api.CounterPair{
StreamId: stream.id,
Request: 0,
Response: 0,
}
@@ -103,6 +104,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
extension: extension,
emitter: factory.Emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
})
stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
@@ -121,11 +123,13 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
extension: extension,
emitter: factory.Emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
})
factory.streamsMap.Store(stream.id, &tcpStreamWrapper{
stream: stream,
createdAt: time.Now(),
stream: stream,
reqResMatcher: reqResMatcher,
createdAt: time.Now(),
})
factory.wg.Add(2)

View File

@@ -22,6 +22,8 @@ import OasModal from "../../OasModal/OasModal";
import {useCommonStyles} from "../../../helpers/commonStyle"
import {TLSWarning} from "../../TLSWarning/TLSWarning";
import serviceMapModalOpenAtom from "../../../recoil/serviceMapModalOpen";
import serviceMap from "../../assets/serviceMap.svg";
import services from "../../assets/services.svg";
const useLayoutStyles = makeStyles(() => ({
details: {
@@ -74,7 +76,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
const scrollableRef = useRef(null);
const [openOasModal, setOpenOasModal] = useState(false);
const handleOpenModal = () => setOpenOasModal(true);
const handleCloseModal = () => setOpenOasModal(false);
const [showTLSWarning, setShowTLSWarning] = useState(false);
@@ -256,8 +258,14 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
}
}
const handleOpenOasModal = () => {
ws.current.close();
setOpenOasModal(true);
}
const openServiceMapModalDebounce = debounce(() => {
setServiceMapModalOpen(true)
ws.current.close();
setServiceMapModalOpen(true);
}, 500);
return (
@@ -277,17 +285,21 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
</div>
<div style={{ display: 'flex' }}>
{window["isOasEnabled"] && <Button
startIcon={<img className="custom" src={services} alt="services"></img>}
size="large"
type="submit"
variant="contained"
className={commonClasses.button}
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
style={{ marginRight: 25 }}
onClick={handleOpenModal}
onClick={handleOpenOasModal}
>
Show OAS
</Button>}
{window["isServiceMapEnabled"] && <Button
startIcon={<img src={serviceMap} className="custom" alt="service-map" style={{marginRight:"8%"}}></img>}
size="large"
variant="contained"
className={commonClasses.button}
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
onClick={openServiceMapModalDebounce}
>
Service Map

View File

@@ -0,0 +1,32 @@
@import "../../variables.module"
.legend-scale ul
margin-top: -29px
margin-left: -27px
padding: 0
float: left
list-style: none
li
display: block
float: left
width: 50px
margin-bottom: 6px
text-align: center
font-size: 80%
list-style: none
ul.legend-labels li span
display: block
float: left
height: 15px
width: 50px
.legend-source
font-size: 70%
color: #999
clear: both
a
color: #777

View File

@@ -3,11 +3,14 @@ import { Box, Fade, Modal, Backdrop, Button } from "@material-ui/core";
import { toast } from "react-toastify";
import Api from "../../helpers/api";
import spinnerStyle from '../style/Spinner.module.sass';
import './ServiceMapModal.sass';
import spinnerImg from '../assets/spinner.svg';
import Graph from "react-graph-vis";
import debounce from 'lodash/debounce';
import ServiceMapOptions from './ServiceMapOptions'
import { useCommonStyles } from "../../helpers/commonStyle";
import refresh from "../assets/refresh.svg";
import close from "../assets/close.svg";
interface GraphData {
nodes: Node[];
@@ -29,6 +32,7 @@ interface Edge {
label: string;
title?: string;
color?: object;
font?: object;
}
interface ServiceMapNode {
@@ -104,7 +108,7 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
const newGraphData: GraphData = { nodes: [], edges: [] }
if (serviceMapData.nodes) {
newGraphData.nodes = serviceMapData.nodes.map(node => {
newGraphData.nodes = serviceMapData.nodes.map<Node>(node => {
return {
id: node.id,
value: node.count,
@@ -115,7 +119,7 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
}
if (serviceMapData.edges) {
newGraphData.edges = serviceMapData.edges.map(edge => {
newGraphData.edges = serviceMapData.edges.map<Edge>(edge => {
return {
from: edge.source.id,
to: edge.destination.id,
@@ -125,6 +129,10 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
color: edge.protocol.backgroundColor,
highlight: edge.protocol.backgroundColor
},
font: {
color: edge.protocol.backgroundColor,
strokeColor: edge.protocol.backgroundColor
},
}
})
}
@@ -141,22 +149,10 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
}, [isOpen])
useEffect(() => {
getServiceMapData()
getServiceMapData();
return () => setGraphData({ nodes: [], edges: [] })
}, [getServiceMapData])
const resetServiceMap = debounce(async () => {
try {
const serviceMapResetResponse = await api.serviceMapReset();
if (serviceMapResetResponse["status"] === "enabled") {
refreshServiceMap()
}
} catch (ex) {
toast.error("An error occurred while resetting Mizu Service Map, see console for mode details");
console.error(ex);
}
}, 500);
const refreshServiceMap = debounce(() => {
getServiceMapData();
}, 500);
@@ -180,33 +176,34 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
<img alt="spinner" src={spinnerImg} style={{ height: 50 }} />
</div>}
{!isLoading && <div style={{ height: "100%", width: "100%" }}>
<div style={{ display: "flex", justifyContent: "space-between" }}>
<div>
<Button
startIcon={<img src={refresh} className="custom" alt="refresh" style={{ marginRight:"8%"}}></img>}
size="medium"
variant="contained"
className={commonClasses.button}
style={{ marginRight: 25 }}
onClick={() => onClose()}
>
Close
</Button>
<Button
variant="contained"
className={commonClasses.button}
style={{ marginRight: 25 }}
onClick={resetServiceMap}
>
Reset
</Button>
<Button
variant="contained"
className={commonClasses.button}
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
onClick={refreshServiceMap}
>
Refresh
</Button>
</div>
<img src={close} alt="close" onClick={() => onClose()} style={{cursor:"pointer"}}></img>
</div>
<Graph
graph={graphData}
options={ServiceMapOptions}
/>
<div className='legend-scale'>
<ul className='legend-labels'>
<li><span style={{ background: '#205cf5' }}></span>HTTP</li>
<li><span style={{ background: '#244c5a' }}></span>HTTP/2</li>
<li><span style={{ background: '#244c5a' }}></span>gRPC</li>
<li><span style={{ background: '#ff6600' }}></span>AMQP</li>
<li><span style={{ background: '#000000' }}></span>KAFKA</li>
<li><span style={{ background: '#a41e11' }}></span>REDIS</li>
</ul>
</div>
</div>}
</Box>
</Fade>

View File

@@ -1,3 +1,142 @@
const minNodeScaling = 10
const maxNodeScaling = 30
const minEdgeScaling = 1
const maxEdgeScaling = maxNodeScaling / 2
const minLabelScaling = 11
const maxLabelScaling = 16
const selectedNodeColor = "#0C0B1A"
const selectedNodeBorderColor = "#205CF5"
const selectedNodeLabelColor = "#205CF5"
const selectedEdgeLabelColor = "#205CF5"
const customScaling = (min, max, total, value) => {
if (max === min) {
return 0.5;
}
else {
const scale = 1 / (max - min);
return Math.max(0, (value - min) * scale);
}
}
const nodeSelected = (values, id, selected, hovering) => {
values.color = selectedNodeColor;
values.borderColor = selectedNodeBorderColor;
values.borderWidth = 4;
}
const nodeLabelSelected = (values, id, selected, hovering) => {
values.size = values.size + 1;
values.color = selectedNodeLabelColor;
values.strokeColor = selectedNodeLabelColor;
values.strokeWidth = 0.2
}
const edgeSelected = (values, id, selected, hovering) => {
values.opacity = 0.4;
values.width = values.width + 1;
}
const edgeLabelSelected = (values, id, selected, hovering) => {
values.size = values.size + 1;
values.color = selectedEdgeLabelColor;
values.strokeColor = selectedEdgeLabelColor;
values.strokeWidth = 0.2
}
const nodeOptions = {
shape: 'dot',
chosen: {
node: nodeSelected,
label: nodeLabelSelected,
},
color: {
background: '#494677',
border: selectedNodeColor,
},
font: {
color: selectedNodeColor,
size: 11, // px
face: 'Roboto',
background: '#FFFFFFBF',
strokeWidth: 0.2, // px
strokeColor: selectedNodeColor,
align: 'center',
multi: false,
},
// defines the node min and max sizes when zoom in/out, based on the node value
scaling: {
min: minNodeScaling,
max: maxNodeScaling,
// defines the label scaling size in px
label: {
enabled: true,
min: minLabelScaling,
max: maxLabelScaling,
maxVisible: maxLabelScaling,
drawThreshold: 5,
},
customScalingFunction: customScaling,
},
borderWidth: 2,
labelHighlightBold: true,
opacity: 1,
shadow: true,
}
const edgeOptions = {
chosen: {
edge: edgeSelected,
label: edgeLabelSelected,
},
dashes: false,
arrowStrikethrough: false,
arrows: {
to: {
enabled: true,
},
middle: {
enabled: false,
},
from: {
enabled: false,
}
},
smooth: {
enabled: true,
type: 'dynamic',
roundness: 1.0
},
font: {
color: '#000000',
size: 11, // px
face: 'Roboto',
background: '#FFFFFFCC',
strokeWidth: 0.2, // px
strokeColor: '#000000',
align: 'horizontal',
multi: false,
},
scaling: {
min: minEdgeScaling,
max: maxEdgeScaling,
label: {
enabled: true,
min: minLabelScaling,
max: maxLabelScaling,
maxVisible: maxLabelScaling,
drawThreshold: 5
},
customScalingFunction: customScaling,
},
labelHighlightBold: true,
selectionWidth: 1,
shadow: true,
}
const ServiceMapOptions = {
physics: {
enabled: true,
@@ -5,10 +144,10 @@ const ServiceMapOptions = {
barnesHut: {
theta: 0.5,
gravitationalConstant: -2000,
centralGravity: 0.3,
centralGravity: 0.4,
springLength: 180,
springConstant: 0.04,
damping: 0.09,
damping: 0.2,
avoidOverlap: 1
},
},
@@ -16,68 +155,20 @@ const ServiceMapOptions = {
hierarchical: false,
randomSeed: 1 // always on node 1
},
nodes: {
shape: 'dot',
chosen: true,
color: {
background: '#27AE60',
border: '#000000',
highlight: {
background: '#27AE60',
border: '#000000',
},
},
font: {
color: '#343434',
size: 14, // px
face: 'arial',
background: 'none',
strokeWidth: 0, // px
strokeColor: '#ffffff',
align: 'center',
multi: false,
},
borderWidth: 1.5,
borderWidthSelected: 2.5,
labelHighlightBold: true,
opacity: 1,
shadow: true,
},
edges: {
chosen: true,
dashes: false,
arrowStrikethrough: false,
arrows: {
to: {
enabled: true,
},
middle: {
enabled: false,
},
from: {
enabled: false,
}
},
smooth: {
enabled: true,
type: 'dynamic',
roundness: 1.0
},
font: {
color: '#343434',
size: 12, // px
face: 'arial',
background: 'none',
strokeWidth: 2, // px
strokeColor: '#ffffff',
align: 'horizontal',
multi: false,
},
labelHighlightBold: true,
selectionWidth: 1,
shadow: true,
},
nodes: nodeOptions,
edges: edgeOptions,
autoResize: true,
interaction: {
selectable: true,
selectConnectedEdges: true,
multiselect: true,
dragNodes: true,
dragView: true,
hover: true,
hoverConnectedEdges: true,
zoomView: true,
zoomSpeed: 1,
},
};
export default ServiceMapOptions

View File

@@ -0,0 +1,4 @@
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M18.591 9.99997C18.591 14.7446 14.7447 18.5909 10.0001 18.5909C5.25546 18.5909 1.40918 14.7446 1.40918 9.99997C1.40918 5.25534 5.25546 1.40906 10.0001 1.40906C14.7447 1.40906 18.591 5.25534 18.591 9.99997Z" fill="#E9EBF8" stroke="#BCCEFD"/>
<path d="M13.1604 8.23038L11.95 7.01994L10.1392 8.83078L8.32832 7.01994L7.11789 8.23038L8.92872 10.0412L7.12046 11.8495L8.33089 13.0599L10.1392 11.2517L11.9474 13.0599L13.1579 11.8495L11.3496 10.0412L13.1604 8.23038Z" fill="#205CF5"/>
</svg>

After

Width:  |  Height:  |  Size: 588 B

View File

@@ -0,0 +1,3 @@
<svg width="26" height="26" viewBox="0 0 26 26" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M10.8337 11.9167H7.69308L7.69416 11.907C7.83561 11.2143 8.11247 10.5564 8.50883 9.97105C9.09865 9.10202 9.92598 8.42097 10.8922 8.00913C11.2193 7.87046 11.5606 7.7643 11.9083 7.69388C12.6297 7.54762 13.3731 7.54762 14.0945 7.69388C15.1312 7.90631 16.0825 8.41908 16.8299 9.1683L18.3639 7.63863C17.6725 6.94707 16.8546 6.39501 15.9546 6.01255C15.4956 5.81823 15.0184 5.67016 14.53 5.57055C13.5223 5.36581 12.4838 5.36581 11.4761 5.57055C10.9873 5.67057 10.5098 5.819 10.0504 6.01363C8.69682 6.58791 7.53808 7.54123 6.71374 8.7588C6.15895 9.5798 5.77099 10.5019 5.57191 11.4725C5.54158 11.6188 5.52533 11.7683 5.50366 11.9167H2.16699L6.50033 16.25L10.8337 11.9167ZM15.167 14.0834H18.3076L18.3065 14.092C18.0234 15.4806 17.205 16.7019 16.0282 17.4915C15.443 17.8882 14.7851 18.1651 14.0923 18.3062C13.3713 18.4525 12.6283 18.4525 11.9072 18.3062C11.2146 18.1648 10.5567 17.8879 9.97133 17.4915C9.68383 17.2971 9.41541 17.0758 9.16966 16.8307L7.63783 18.3625C8.32954 19.0539 9.14791 19.6056 10.0482 19.9875C10.5076 20.1825 10.9875 20.331 11.4728 20.4295C12.4801 20.6344 13.5184 20.6344 14.5257 20.4295C16.4676 20.0265 18.1757 18.8819 19.2869 17.2391C19.8412 16.4187 20.2288 15.4974 20.4277 14.5275C20.4569 14.3813 20.4742 14.2318 20.4959 14.0834H23.8337L19.5003 9.75005L15.167 14.0834Z" fill="#205CF5"/>
</svg>

After

Width:  |  Height:  |  Size: 1.4 KiB

View File

@@ -0,0 +1,15 @@
<svg width="18" height="18" viewBox="0 0 18 18" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M15.2998 5.40002L8.9998 9.00002" stroke="#A0B2FF" stroke-linecap="round" stroke-linejoin="round"/>
<path d="M15.2998 12.6L8.99976 8.99988" stroke="#A0B2FF" stroke-linecap="round" stroke-linejoin="round"/>
<path d="M2.7002 5.40002L2.70024 12.5001" stroke="#A0B2FF" stroke-linecap="round" stroke-linejoin="round"/>
<path d="M9 8.99988V16.2" stroke="#A0B2FF" stroke-linecap="round" stroke-linejoin="round"/>
<path d="M2.7 5.40005L9 1.80005" stroke="#A0B2FF" stroke-linecap="round" stroke-linejoin="round"/>
<path d="M15.2998 5.40005L8.9998 1.80005" stroke="#A0B2FF" stroke-linecap="round" stroke-linejoin="round"/>
<path d="M4.4999 5.39995C4.4999 6.39406 3.69402 7.19995 2.6999 7.19995C1.70579 7.19995 0.899902 6.39406 0.899902 5.39995C0.899902 4.40584 1.70579 3.59995 2.6999 3.59995C3.69402 3.59995 4.4999 4.40584 4.4999 5.39995Z" fill="#205CF5"/>
<path d="M4.4999 12.6C4.4999 13.5941 3.69402 14.4 2.6999 14.4C1.70579 14.4 0.899902 13.5941 0.899902 12.6C0.899902 11.6059 1.70579 10.8 2.6999 10.8C3.69402 10.8 4.4999 11.6059 4.4999 12.6Z" fill="#205CF5"/>
<path d="M17.1 5.39995C17.1 6.39406 16.2941 7.19995 15.3 7.19995C14.3059 7.19995 13.5 6.39406 13.5 5.39995C13.5 4.40584 14.3059 3.59995 15.3 3.59995C16.2941 3.59995 17.1 4.40584 17.1 5.39995Z" fill="#205CF5"/>
<path d="M17.1 12.6C17.1 13.5941 16.2941 14.4 15.3 14.4C14.3059 14.4 13.5 13.5941 13.5 12.6C13.5 11.6059 14.3059 10.8 15.3 10.8C16.2941 10.8 17.1 11.6059 17.1 12.6Z" fill="#205CF5"/>
<path d="M10.8002 1.79998C10.8002 2.79409 9.99431 3.59998 9.0002 3.59998C8.00608 3.59998 7.2002 2.79409 7.2002 1.79998C7.2002 0.805863 8.00608 -2.45571e-05 9.0002 -2.45571e-05C9.99431 -2.45571e-05 10.8002 0.805863 10.8002 1.79998Z" fill="#205CF5"/>
<path d="M10.8002 9.00005C10.8002 9.99416 9.99431 10.8 9.0002 10.8C8.00608 10.8 7.2002 9.99416 7.2002 9.00005C7.2002 8.00594 8.00608 7.20005 9.0002 7.20005C9.99431 7.20005 10.8002 8.00594 10.8002 9.00005Z" fill="#205CF5"/>
<path d="M10.8002 16.2C10.8002 17.1941 9.99431 18 9.0002 18C8.00608 18 7.2002 17.1941 7.2002 16.2C7.2002 15.2059 8.00608 14.4 9.0002 14.4C9.99431 14.4 10.8002 15.2059 10.8002 16.2Z" fill="#205CF5"/>
</svg>

After

Width:  |  Height:  |  Size: 2.2 KiB

View File

@@ -0,0 +1,10 @@
<svg width="30" height="30" viewBox="0 0 30 30" fill="none" xmlns="http://www.w3.org/2000/svg">
<rect x="5.5" y="6.5" width="20" height="17" rx="1.5" stroke="#A0B2FF"/>
<path d="M18.5 6.5H24C24.8284 6.5 25.5 7.17157 25.5 8V22C25.5 22.8284 24.8284 23.5 24 23.5H18.5V6.5Z" fill="#BCCEFD" stroke="#A0B2FF"/>
<rect x="19" y="9" width="5" height="2" rx="1" fill="white"/>
<rect x="8" y="9" width="4" height="2" rx="1" fill="#205CF5"/>
<rect x="8" y="14" width="7" height="2" rx="1" fill="#205CF5"/>
<rect width="3" height="2" rx="1" transform="matrix(-1 0 0 1 22 14)" fill="white"/>
<path d="M24 20C24 19.4477 23.5523 19 23 19H19V21H23C23.5523 21 24 20.5523 24 20Z" fill="white"/>
<rect x="8" y="19" width="7" height="2" rx="1" fill="#205CF5"/>
</svg>

After

Width:  |  Height:  |  Size: 747 B

View File

@@ -9,7 +9,7 @@ export const useCommonStyles = makeStyles(() => ({
fontSize: 12,
padding: "8px 12px",
borderRadius: "6px ! important",
"&:hover": {
backgroundColor: "#205cf5",
},
@@ -27,6 +27,11 @@ export const useCommonStyles = makeStyles(() => ({
backgroundColor: "transparent",
},
},
imagedButton: {
padding: "0px 14px"
},
textField: {
outline: 0,
background: "white",