Compare commits

...

13 Commits

Author SHA1 Message Date
Gustavo Massaneiro
bb3ae1ef70 Service Map node key as entry.Name instead of entry.IP (#818) 2022-02-16 09:01:59 +02:00
AmitUp9
5dfa94d76e service map - reset button and function deleted (#805)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-02-15 22:57:56 +02:00
Alex Haiut
dfe63f2318 enabled service-map by default (#816) 2022-02-15 22:41:57 +02:00
Alex Haiut
9cf64a43f5 modified namespace in helm command (#814) 2022-02-15 17:14:37 +02:00
Igor Gov
bf2362d836 Update mizu install command (#811) 2022-02-15 16:05:28 +02:00
Igor Gov
1c11523d9d View command - no version check (#810) 2022-02-15 15:49:44 +02:00
Nimrod Gilboa Markevich
150a87413d Print http error response details (#792) 2022-02-15 12:29:34 +02:00
Igor Gov
f7221a7355 Sending telemetry config to server (#808) 2022-02-15 11:08:16 +02:00
RamiBerm
4197ec198c Tag entries destination namespace (#803)
* Update main.go, socket_server_handlers.go, and 7 more files...

* Update cors.go

* omit empty namespace in json so tests pass

* fix indent
2022-02-15 10:51:38 +02:00
Nimrod Gilboa Markevich
5484b7c491 Force DaemonSet apply (#804)
Required for apply to work if the DaemonSet is created by another program e.g. Helm.
2022-02-15 10:16:24 +02:00
M. Mert Yıldıran
041223b558 Move the request-response matcher's scope from global-level to TCP stream-level (#793)
* Create a new request-response matcher for each TCP stream

* Fix the `ident` formats in request-response matchers

* Don't sort the items in the HTTP tests

* Update tap/extensions/kafka/matcher.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Temporarily change the bucket folder to the new expected

* Bring back the `deleteOlderThan` method

* Use `api.RequestResponseMatcher` instead of `interface{}` as type

* Use `api.RequestResponseMatcher` instead of `interface{}` as type (more)

* Update the key format comments

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-02-14 18:16:58 +03:00
Igor Gov
71c04d20ef Support rancher client config (#802) 2022-02-14 16:48:33 +02:00
Igor Gov
8852bac77b Adding logs of k8s client config (#800) 2022-02-14 06:26:57 +02:00
36 changed files with 271 additions and 280 deletions

View File

@@ -118,8 +118,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
for item := range outputItems {
extension := extensionsMap[item.Protocol.Name]
resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation)
resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace)
if extension.Protocol.Name == "http" {
if !disableOASValidation {
var httpPair tapApi.HTTPRequestResponsePair
@@ -158,26 +158,32 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
}
}
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string) {
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string, namespace string) {
if k8sResolver != nil {
unresolvedSource := connectionInfo.ClientIP
resolvedSource = k8sResolver.Resolve(unresolvedSource)
if resolvedSource == "" {
resolvedSourceObject := k8sResolver.Resolve(unresolvedSource)
if resolvedSourceObject == nil {
logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource)
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
return
}
} else {
resolvedSource = resolvedSourceObject.FullAddress
}
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
if resolvedDestination == "" {
resolvedDestinationObject := k8sResolver.Resolve(unresolvedDestination)
if resolvedDestinationObject == nil {
logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination)
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
return
}
} else {
resolvedDestination = resolvedDestinationObject.FullAddress
namespace = resolvedDestinationObject.Namespace
}
}
return resolvedSource, resolvedDestination
return resolvedSource, resolvedDestination, namespace
}
func CheckIsServiceIP(address string) bool {

View File

@@ -104,9 +104,9 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
}
func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
resolvedName := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
if resolvedName != "" {
outboundLinkMessage.Data.DstIP = resolvedName
resolvedNameObject := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
if resolvedNameObject != nil {
outboundLinkMessage.Data.DstIP = resolvedNameObject.FullAddress
} else if outboundLinkMessage.Data.SuggestedResolvedName != "" {
outboundLinkMessage.Data.DstIP = outboundLinkMessage.Data.SuggestedResolvedName
}

View File

@@ -102,13 +102,13 @@ func (s *ServiceMapControllerSuite) TestGet() {
// response nodes
aNode := servicemap.ServiceMapNode{
Id: 1,
Name: TCPEntryA.IP,
Name: TCPEntryA.Name,
Entry: TCPEntryA,
Count: 1,
}
bNode := servicemap.ServiceMapNode{
Id: 2,
Name: TCPEntryB.IP,
Name: TCPEntryB.Name,
Entry: TCPEntryB,
Count: 1,
}

View File

@@ -7,7 +7,7 @@ func CORSMiddleware() gin.HandlerFunc {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With, x-session-token")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT, DELETE")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(204)

View File

@@ -30,6 +30,11 @@ type Resolver struct {
namespace string
}
type ResolvedObjectInfo struct {
FullAddress string
Namespace string
}
func (resolver *Resolver) Start(ctx context.Context) {
if !resolver.isStarted {
resolver.isStarted = true
@@ -40,12 +45,12 @@ func (resolver *Resolver) Start(ctx context.Context) {
}
}
func (resolver *Resolver) Resolve(name string) string {
func (resolver *Resolver) Resolve(name string) *ResolvedObjectInfo {
resolvedName, isFound := resolver.nameMap.Get(name)
if !isFound {
return ""
return nil
}
return resolvedName.(string)
return resolvedName.(*ResolvedObjectInfo)
}
func (resolver *Resolver) GetMap() cmap.ConcurrentMap {
@@ -71,7 +76,7 @@ func (resolver *Resolver) watchPods(ctx context.Context) error {
}
if event.Type == watch.Deleted {
pod := event.Object.(*corev1.Pod)
resolver.saveResolvedName(pod.Status.PodIP, "", event.Type)
resolver.saveResolvedName(pod.Status.PodIP, "", pod.Namespace, event.Type)
}
case <-ctx.Done():
watcher.Stop()
@@ -106,10 +111,10 @@ func (resolver *Resolver) watchEndpoints(ctx context.Context) error {
}
if subset.Addresses != nil {
for _, address := range subset.Addresses {
resolver.saveResolvedName(address.IP, serviceHostname, event.Type)
resolver.saveResolvedName(address.IP, serviceHostname, endpoint.Namespace, event.Type)
for _, port := range ports {
ipWithPort := fmt.Sprintf("%s:%d", address.IP, port)
resolver.saveResolvedName(ipWithPort, serviceHostname, event.Type)
resolver.saveResolvedName(ipWithPort, serviceHostname, endpoint.Namespace, event.Type)
}
}
}
@@ -139,19 +144,19 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
service := event.Object.(*corev1.Service)
serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace)
if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString {
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type)
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
if service.Spec.Ports != nil {
for _, port := range service.Spec.Ports {
if port.Port > 0 {
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, event.Type)
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, service.Namespace, event.Type)
}
}
}
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type)
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
}
if service.Status.LoadBalancer.Ingress != nil {
for _, ingress := range service.Status.LoadBalancer.Ingress {
resolver.saveResolvedName(ingress.IP, serviceHostname, event.Type)
resolver.saveResolvedName(ingress.IP, serviceHostname, service.Namespace, event.Type)
}
}
case <-ctx.Done():
@@ -161,21 +166,22 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
}
}
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
func (resolver *Resolver) saveResolvedName(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.nameMap.Remove(key)
logger.Log.Infof("setting %s=nil", key)
} else {
resolver.nameMap.Set(key, resolved)
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
logger.Log.Infof("setting %s=%s", key, resolved)
}
}
func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) {
func (resolver *Resolver) saveServiceIP(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.serviceMap.Remove(key)
} else {
resolver.serviceMap.Set(key, resolved)
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
}
}

View File

@@ -172,20 +172,33 @@ func (s *serviceMap) NewTCPEntry(src *tapApi.TCP, dst *tapApi.TCP, p *tapApi.Pro
return
}
srcEntry := &entryData{
key: key(src.IP),
entry: src,
}
if len(srcEntry.entry.Name) == 0 {
var srcEntry *entryData
var dstEntry *entryData
if len(src.Name) == 0 {
srcEntry = &entryData{
key: key(src.IP),
entry: src,
}
srcEntry.entry.Name = UnresolvedNodeName
} else {
srcEntry = &entryData{
key: key(src.Name),
entry: src,
}
}
dstEntry := &entryData{
key: key(dst.IP),
entry: dst,
}
if len(dstEntry.entry.Name) == 0 {
if len(dst.Name) == 0 {
dstEntry = &entryData{
key: key(dst.IP),
entry: dst,
}
dstEntry.entry.Name = UnresolvedNodeName
} else {
dstEntry = &entryData{
key: key(dst.Name),
entry: dst,
}
}
s.addEdge(srcEntry, dstEntry, p)

View File

@@ -268,9 +268,14 @@ func (s *ServiceMapEnabledSuite) TestServiceMap() {
assert.LessOrEqual(node.Id, expectedNodeCount)
// entry
// node.Name is the key of the node, key = entry.IP
// node.Name is the key of the node, key = entry.Name by default
// entry.Name is the name of the service and could be unresolved
assert.Equal(node.Name, node.Entry.IP)
// when entry.Name is unresolved, key = entry.IP
if node.Entry.Name == UnresolvedNodeName {
assert.Equal(node.Name, node.Entry.IP)
} else {
assert.Equal(node.Name, node.Entry.Name)
}
assert.Equal(Port, node.Entry.Port)
assert.Equal(entryName, node.Entry.Name)
@@ -320,16 +325,24 @@ func (s *ServiceMapEnabledSuite) TestServiceMap() {
cdEdge := -1
acEdge := -1
var validateEdge = func(edge ServiceMapEdge, sourceEntryName string, destEntryName string, protocolName string, protocolCount int) {
// source
// source node
assert.Contains(nodeIds, edge.Source.Id)
assert.LessOrEqual(edge.Source.Id, expectedNodeCount)
assert.Equal(edge.Source.Name, edge.Source.Entry.IP)
if edge.Source.Entry.Name == UnresolvedNodeName {
assert.Equal(edge.Source.Name, edge.Source.Entry.IP)
} else {
assert.Equal(edge.Source.Name, edge.Source.Entry.Name)
}
assert.Equal(sourceEntryName, edge.Source.Entry.Name)
// destination
// destination node
assert.Contains(nodeIds, edge.Destination.Id)
assert.LessOrEqual(edge.Destination.Id, expectedNodeCount)
assert.Equal(edge.Destination.Name, edge.Destination.Entry.IP)
if edge.Destination.Entry.Name == UnresolvedNodeName {
assert.Equal(edge.Destination.Name, edge.Destination.Entry.IP)
} else {
assert.Equal(edge.Destination.Name, edge.Destination.Entry.Name)
}
assert.Equal(destEntryName, edge.Destination.Entry.Name)
// protocol

View File

@@ -4,9 +4,11 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/up9inc/mizu/shared/kubernetes"
@@ -57,10 +59,8 @@ func (provider *Provider) TestConnection() error {
func (provider *Provider) isReachable() (bool, error) {
echoUrl := fmt.Sprintf("%s/echo", provider.url)
if response, err := provider.client.Get(echoUrl); err != nil {
if _, err := provider.get(echoUrl); err != nil {
return false, err
} else if response.StatusCode != 200 {
return false, fmt.Errorf("invalid status code %v", response.StatusCode)
} else {
return true, nil
}
@@ -72,10 +72,8 @@ func (provider *Provider) ReportTapperStatus(tapperStatus shared.TapperStatus) e
if jsonValue, err := json.Marshal(tapperStatus); err != nil {
return fmt.Errorf("failed Marshal the tapper status %w", err)
} else {
if response, err := provider.client.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
if _, err := provider.post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else if response.StatusCode != 200 {
return fmt.Errorf("failed sending to API server the tapper status, response status code %v", response.StatusCode)
} else {
logger.Log.Debugf("Reported to server API about tapper status: %v", tapperStatus)
return nil
@@ -91,10 +89,8 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
if jsonValue, err := json.Marshal(podInfos); err != nil {
return fmt.Errorf("failed Marshal the tapped pods %w", err)
} else {
if response, err := provider.client.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
if _, err := provider.post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else if response.StatusCode != 200 {
return fmt.Errorf("failed sending to API server the tapped pods, response status code %v", response.StatusCode)
} else {
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
return nil
@@ -105,11 +101,9 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
func (provider *Provider) GetGeneralStats() (map[string]interface{}, error) {
generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url)
response, requestErr := provider.client.Get(generalStatsUrl)
response, requestErr := provider.get(generalStatsUrl)
if requestErr != nil {
return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr)
} else if response.StatusCode != 200 {
return nil, fmt.Errorf("failed to get general stats for telemetry, status code: %v", response.StatusCode)
}
defer response.Body.Close()
@@ -132,7 +126,7 @@ func (provider *Provider) GetVersion() (string, error) {
Method: http.MethodGet,
URL: versionUrl,
}
statusResp, err := provider.client.Do(req)
statusResp, err := provider.do(req)
if err != nil {
return "", err
}
@@ -145,3 +139,40 @@ func (provider *Provider) GetVersion() (string, error) {
return versionResponse.Ver, nil
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) get(url string) (*http.Response, error) {
return provider.checkError(provider.client.Get(url))
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) post(url, contentType string, body io.Reader) (*http.Response, error) {
return provider.checkError(provider.client.Post(url, contentType, body))
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) do(req *http.Request) (*http.Response, error) {
return provider.checkError(provider.client.Do(req))
}
func (provider *Provider) checkError(response *http.Response, errInOperation error) (*http.Response, error) {
if (errInOperation != nil) {
return response, errInOperation
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
} else if response.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(response.Body)
response.Body.Close()
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
if err != nil {
return response, err
}
errorMsg := strings.ReplaceAll((string(body)), "\n", ";")
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
}
return response, nil
}

View File

@@ -1,10 +1,9 @@
package cmd
import (
"fmt"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger"
)
var installCmd = &cobra.Command{
@@ -12,13 +11,13 @@ var installCmd = &cobra.Command{
Short: "Installs mizu components",
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("install", nil)
runMizuInstall()
return nil
},
PreRunE: func(cmd *cobra.Command, args []string) error {
if config.Config.IsNsRestrictedMode() {
return fmt.Errorf("install is not supported in restricted namespace mode")
}
logger.Log.Infof("This command has been deprecated, please use helm as described below.\n\n")
logger.Log.Infof("To install stable build of Mizu on your cluster using helm, run the following command:")
logger.Log.Infof(" helm install mizu https://static.up9.com/mizu/helm --namespace=mizu --create-namespace\n\n")
logger.Log.Infof("To install development build of Mizu on your cluster using helm, run the following command:")
logger.Log.Infof(" helm install mizu https://static.up9.com/mizu/helm-develop --namespace=mizu --create-namespace")
return nil
},
@@ -27,4 +26,3 @@ var installCmd = &cobra.Command{
func init() {
rootCmd.AddCommand(installCmd)
}

View File

@@ -1,81 +0,0 @@
package cmd
import (
"context"
"errors"
"fmt"
"github.com/creasty/defaults"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/resources"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func runMizuInstall() {
kubernetesProvider, err := getKubernetesProviderForCli()
if err != nil {
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
var serializedValidationRules string
var serializedContract string
var defaultMaxEntriesDBSizeBytes int64 = 200 * 1000 * 1000
defaultResources := shared.Resources{}
if err := defaults.Set(&defaultResources); err != nil {
logger.Log.Debug(err)
}
mizuAgentConfig := getInstallMizuAgentConfig(defaultMaxEntriesDBSizeBytes, defaultResources)
serializedMizuConfig, err := getSerializedMizuAgentConfig(mizuAgentConfig)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error serializing mizu config: %v", errormessage.FormatError(err)))
return
}
if err = resources.CreateInstallMizuResources(ctx, kubernetesProvider, serializedValidationRules,
serializedContract, serializedMizuConfig, config.Config.IsNsRestrictedMode(),
config.Config.MizuResourcesNamespace, config.Config.AgentImage,
config.Config.KratosImage, config.Config.KetoImage,
nil, defaultMaxEntriesDBSizeBytes, defaultResources, config.Config.ImagePullPolicy(),
config.Config.LogLevel(), false); err != nil {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
logger.Log.Info("Mizu is already running in this namespace, run `mizu clean` to remove the currently running Mizu instance")
} else {
defer resources.CleanUpMizuResources(ctx, cancel, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.MizuResourcesNamespace)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
}
return
}
logger.Log.Infof(uiUtils.Magenta, "Installation completed, run `mizu view` to connect to the mizu daemon instance")
}
func getInstallMizuAgentConfig(maxDBSizeBytes int64, tapperResources shared.Resources) *shared.MizuAgentConfig {
mizuAgentConfig := shared.MizuAgentConfig{
MaxDBSizeBytes: maxDBSizeBytes,
AgentImage: config.Config.AgentImage,
PullPolicy: config.Config.ImagePullPolicyStr,
LogLevel: config.Config.LogLevel(),
TapperResources: tapperResources,
MizuResourcesNamespace: config.Config.MizuResourcesNamespace,
AgentDatabasePath: shared.DataDirPath,
StandaloneMode: true,
ServiceMap: config.Config.ServiceMap,
OAS: config.Config.OAS,
Elastic: config.Config.Elastic,
}
return &mizuAgentConfig
}

View File

@@ -162,6 +162,7 @@ func getTapMizuAgentConfig() *shared.MizuAgentConfig {
AgentDatabasePath: shared.DataDirPath,
ServiceMap: config.Config.ServiceMap,
OAS: config.Config.OAS,
Telemetry: config.Config.Telemetry,
Elastic: config.Config.Elastic,
}

View File

@@ -3,13 +3,13 @@ package cmd
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/utils"
"net/http"
"github.com/up9inc/mizu/cli/utils"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
@@ -62,14 +62,5 @@ func runMizuView() {
uiUtils.OpenBrowser(url)
}
if isCompatible, err := version.CheckVersionCompatibility(apiServerProvider); err != nil {
logger.Log.Errorf("Failed to check versions compatibility %v", err)
cancel()
return
} else if !isCompatible {
cancel()
return
}
utils.WaitForFinish(ctx, cancel)
}

View File

@@ -38,7 +38,7 @@ type ConfigStruct struct {
ConfigFilePath string `yaml:"config-path,omitempty" readonly:""`
HeadlessMode bool `yaml:"headless" default:"false"`
LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""`
ServiceMap bool `yaml:"service-map" default:"false"`
ServiceMap bool `yaml:"service-map" default:"true"`
OAS bool `yaml:"oas,omitempty" default:"false" readonly:""`
Elastic shared.ElasticConfig `yaml:"elastic"`
}

View File

@@ -76,6 +76,8 @@ func NewProvider(kubeConfigPath string) (*Provider, error) {
"you can set alternative kube config file path by adding the kube-config-path field to the mizu config file, err: %w", kubeConfigPath, err)
}
logger.Log.Debugf("K8s client config, host: %s, api path: %s, user agent: %s", restClientConfig.Host, restClientConfig.APIPath, restClientConfig.UserAgent)
return &Provider{
clientSet: clientSet,
kubernetesConfig: kubernetesConfig,
@@ -952,6 +954,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
labelSelector := applyconfmeta.LabelSelector()
labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName})
applyOptions := metav1.ApplyOptions{
Force: true,
FieldManager: fieldManagerName,
}
daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace)
daemonSet.
WithLabels(map[string]string{
@@ -960,7 +967,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
}).
WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate))
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, metav1.ApplyOptions{FieldManager: fieldManagerName})
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions)
return err
}

View File

@@ -128,9 +128,14 @@ func getHttpDialer(kubernetesProvider *Provider, namespace string, podName strin
return nil, err
}
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName)
hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/") // no need specify "t" twice
serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}
clientConfigHostUrl, err := url.Parse(kubernetesProvider.clientConfig.Host)
if err != nil {
return nil, fmt.Errorf("Failed parsing client config host URL %s, error %w", kubernetesProvider.clientConfig.Host, err)
}
path := fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/portforward", clientConfigHostUrl.Path, namespace, podName)
serverURL := url.URL{Scheme: "https", Path: path, Host: clientConfigHostUrl.Host}
logger.Log.Debugf("Http dialer url %v", serverURL)
return spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL), nil
}

View File

@@ -43,6 +43,7 @@ type MizuAgentConfig struct {
StandaloneMode bool `json:"standaloneMode"`
ServiceMap bool `json:"serviceMap"`
OAS bool `json:"oas"`
Telemetry bool `json:"telemetry"`
Elastic ElasticConfig `json:"elastic"`
}

View File

@@ -39,10 +39,9 @@ type TCP struct {
}
type Extension struct {
Protocol *Protocol
Path string
Dissector Dissector
MatcherMap *sync.Map
Protocol *Protocol
Path string
Dissector Dissector
}
type ConnectionInfo struct {
@@ -62,7 +61,6 @@ type TcpID struct {
}
type CounterPair struct {
StreamId int64
Request uint
Response uint
sync.Mutex
@@ -100,10 +98,15 @@ type SuperIdentifier struct {
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *Entry
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
Macros() map[string]string
NewResponseRequestMatcher() RequestResponseMatcher
}
type RequestResponseMatcher interface {
GetMap() *sync.Map
}
type Emitting struct {
@@ -125,6 +128,7 @@ type Entry struct {
Protocol Protocol `json:"proto"`
Source *TCP `json:"src"`
Destination *TCP `json:"dst"`
Namespace string `json:"namespace,omitempty"`
Outgoing bool `json:"outgoing"`
Timestamp int64 `json:"timestamp"`
StartTime time.Time `json:"startTime"`

View File

@@ -22,6 +22,7 @@ type Cleaner struct {
connectionTimeout time.Duration
stats CleanerStats
statsMutex sync.Mutex
streamsMap *tcpStreamMap
}
func (cl *Cleaner) clean() {
@@ -32,10 +33,15 @@ func (cl *Cleaner) clean() {
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.assemblerMutex.Unlock()
for _, extension := range extensions {
deleted := deleteOlderThan(extension.MatcherMap, startCleanTime.Add(-cl.connectionTimeout))
cl.streamsMap.streams.Range(func(k, v interface{}) bool {
reqResMatcher := v.(*tcpStreamWrapper).reqResMatcher
if reqResMatcher == nil {
return true
}
deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout))
cl.stats.deleted += deleted
}
return true
})
cl.statsMutex.Lock()
logger.Log.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())

View File

@@ -42,7 +42,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
r := AmqpReader{b}
var remaining int
@@ -212,7 +212,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
@@ -254,6 +254,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Method: request["method"].(string),
@@ -300,6 +301,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return nil
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/http/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect2/http/\* expect

View File

@@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
item.ConnectionInfo.ClientPort = ""
}
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil {
return err
@@ -58,7 +58,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
switch messageHTTP1 := messageHTTP1.(type) {
case http.Request:
ident := fmt.Sprintf(
"%s->%s %s->%s %d %s",
"%s_%s_%s_%s_%d_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@@ -78,7 +78,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
}
case http.Response:
ident := fmt.Sprintf(
"%s->%s %s->%s %d %s",
"%s_%s_%s_%s_%d_%s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
@@ -110,7 +110,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b)
if err != nil {
return
@@ -130,8 +130,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d_%s",
counterPair.StreamId,
"%s_%s_%s_%s_%d_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@@ -153,7 +152,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
return
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, err error) {
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response
res, err = http.ReadResponse(b, nil)
if err != nil {
@@ -174,8 +173,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d_%s",
counterPair.StreamId,
"%s_%s_%s_%s_%d_%s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,

View File

@@ -84,14 +84,15 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &http11protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
var err error
isHTTP2, _ := checkIsHTTP2Connection(b, isClient)
@@ -124,7 +125,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options)
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -133,7 +134,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
superIdentifier.Protocol = &http11protocol
} else if isClient {
var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -144,7 +145,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
if switchingProtocolsHTTP2 {
ident := fmt.Sprintf(
"%s->%s %s->%s 1 %s",
"%s_%s_%s_%s_1_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@@ -164,7 +165,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
} else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -181,7 +182,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
return nil
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
var host, authority, path string
request := item.Pair.Request.Payload.(map[string]interface{})
@@ -279,6 +280,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
@@ -472,6 +474,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@@ -11,7 +11,6 @@ import (
"os"
"path"
"path/filepath"
"sort"
"testing"
"time"
@@ -39,7 +38,6 @@ func TestRegister(t *testing.T) {
extension := &api.Extension{}
dissector.Register(extension)
assert.Equal(t, "http", extension.Protocol.Name)
assert.NotNil(t, extension.MatcherMap)
}
func TestMacros(t *testing.T) {
@@ -123,7 +121,8 @@ func TestDissect(t *testing.T) {
SrcPort: "1",
DstPort: "2",
}
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options)
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -141,7 +140,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options)
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -155,14 +154,6 @@ func TestDissect(t *testing.T) {
stop <- true
sort.Slice(items, func(i, j int) bool {
iMarshaled, err := json.Marshal(items[i])
assert.Nil(t, err)
jMarshaled, err := json.Marshal(items[j])
assert.Nil(t, err)
return len(iMarshaled) < len(jMarshaled)
})
marshaled, err := json.Marshal(items)
assert.Nil(t, err)
@@ -214,7 +205,7 @@ func TestAnalyze(t *testing.T) {
var entries []*api.Entry
for _, item := range items {
entry := dissector.Analyze(item, "", "")
entry := dissector.Analyze(item, "", "", "")
entries = append(entries, entry)
}

View File

@@ -8,16 +8,17 @@ import (
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = createResponseRequestMatcher() // global
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{incremental_counter}_{proto_ident}
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {

View File

@@ -33,27 +33,27 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &_protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
return errors.New("Identified by another protocol")
}
if isClient {
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer)
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer, reqResMatcher)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
} else {
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter)
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter, reqResMatcher)
if err != nil {
return err
}
@@ -62,7 +62,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
apiKey := ApiKey(reqDetails["apiKey"].(float64))
@@ -158,6 +158,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
@@ -215,6 +216,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@@ -3,9 +3,10 @@ package kafka
import (
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = CreateResponseRequestMatcher() // global
const maxTry int = 3000
type RequestResponsePair struct {
@@ -13,14 +14,17 @@ type RequestResponsePair struct {
Response Response
}
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}::{correlation_id}
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{correlation_id}
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func CreateResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair {

View File

@@ -19,7 +19,7 @@ type Request struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -214,8 +214,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
}
key := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.SrcIP,
tcpID.SrcPort,
tcpID.DstIP,

View File

@@ -16,7 +16,7 @@ type Response struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -44,8 +44,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s
}
key := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.DstIP,
tcpID.DstPort,
tcpID.SrcIP,

View File

@@ -6,15 +6,14 @@ import (
"github.com/up9inc/mizu/tap/api"
)
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
counterPair.Unlock()
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@@ -36,15 +35,14 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
return nil
}
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
counterPair.Unlock()
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,

View File

@@ -32,14 +32,14 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
is := &RedisInputStream{
Reader: b,
Buf: make([]byte, 8192),
@@ -52,9 +52,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isClient {
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket)
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
} else {
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket)
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
}
if err != nil {
@@ -63,7 +63,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
response := item.Pair.Response.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
@@ -96,6 +96,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
@@ -127,6 +128,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@@ -7,16 +7,17 @@ import (
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = createResponseRequestMatcher() // global
// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}`
// Key is `{src_ip}_{dst_ip}_{src_ip}_{src_port}_{incremental_counter}`
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {

View File

@@ -210,6 +210,7 @@ func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem)
assemblerMutex: &assembler.assemblerMutex,
cleanPeriod: cleanPeriod,
connectionTimeout: staleConnectionTimeout,
streamsMap: streamsMap,
}
cleaner.start()

View File

@@ -47,6 +47,7 @@ type tcpReader struct {
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
@@ -94,7 +95,7 @@ func (h *tcpReader) Close() {
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
if err != nil {
_, err = io.Copy(ioutil.Discard, b)
if err != nil {

View File

@@ -29,8 +29,9 @@ type tcpStreamFactory struct {
}
type tcpStreamWrapper struct {
stream *tcpStream
createdAt time.Time
stream *tcpStream
reqResMatcher api.RequestResponseMatcher
createdAt time.Time
}
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory {
@@ -81,8 +82,8 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
if stream.isTapTarget {
stream.id = factory.streamsMap.nextId()
for i, extension := range extensions {
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
counterPair := &api.CounterPair{
StreamId: stream.id,
Request: 0,
Response: 0,
}
@@ -103,6 +104,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
extension: extension,
emitter: factory.Emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
})
stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
@@ -121,11 +123,13 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
extension: extension,
emitter: factory.Emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
})
factory.streamsMap.Store(stream.id, &tcpStreamWrapper{
stream: stream,
createdAt: time.Now(),
stream: stream,
reqResMatcher: reqResMatcher,
createdAt: time.Now(),
})
factory.wg.Add(2)

View File

@@ -10,8 +10,7 @@ import debounce from 'lodash/debounce';
import ServiceMapOptions from './ServiceMapOptions'
import { useCommonStyles } from "../../helpers/commonStyle";
import refresh from "../assets/refresh.svg";
import reset from "../assets/reset.svg";
import close from "../assets/close.svg"
import close from "../assets/close.svg";
interface GraphData {
nodes: Node[];
@@ -154,19 +153,6 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
return () => setGraphData({ nodes: [], edges: [] })
}, [getServiceMapData])
const resetServiceMap = debounce(async () => {
try {
const serviceMapResetResponse = await api.serviceMapReset();
if (serviceMapResetResponse["status"] === "enabled") {
refreshServiceMap()
}
} catch (ex) {
toast.error("An error occurred while resetting Mizu Service Map, see console for mode details");
console.error(ex);
}
}, 500);
const refreshServiceMap = debounce(() => {
getServiceMapData();
}, 500);
@@ -192,16 +178,6 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
{!isLoading && <div style={{ height: "100%", width: "100%" }}>
<div style={{ display: "flex", justifyContent: "space-between" }}>
<div>
<Button
startIcon={<img src={reset} className="custom" alt="reset" style={{ marginRight:"8%"}}></img>}
size="large"
variant="contained"
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
style={{ marginRight: 25, paddingTop: "3px", paddingBottom: "1px"}}
onClick={resetServiceMap}
>
Reset
</Button>
<Button
startIcon={<img src={refresh} className="custom" alt="refresh" style={{ marginRight:"8%"}}></img>}
size="medium"

View File

@@ -1,4 +0,0 @@
<svg width="22" height="22" viewBox="0 0 22 22" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M11 14C12.114 14 13 13.1127 13 12C13 10.8873 12.114 10 11 10C9.886 10 9 10.8873 9 12C9 13.1127 9.886 14 11 14Z" fill="#205CF5"/>
<path d="M19.0823 10.2539C18.8662 9.1982 18.4442 8.19548 17.8402 7.30312C17.2467 6.42521 16.4906 5.66909 15.6127 5.07562C14.7202 4.47184 13.7175 4.04978 12.6619 3.83354C12.1074 3.72109 11.5429 3.6658 10.9771 3.66854V1.83337L7.33333 4.58337L10.9771 7.33337V5.50187C11.4208 5.50004 11.8644 5.54221 12.2925 5.63021C13.1129 5.79833 13.8923 6.12632 14.586 6.59546C15.2702 7.05676 15.859 7.64561 16.3203 8.32979C17.0366 9.38875 17.4185 10.6383 17.4167 11.9167C17.4165 12.7746 17.2451 13.6238 16.9125 14.4146C16.7507 14.7955 16.5531 15.1602 16.3222 15.5036C16.0904 15.845 15.827 16.1639 15.5357 16.456C14.6483 17.3417 13.5219 17.9492 12.2943 18.2041C11.4407 18.3765 10.5612 18.3765 9.7075 18.2041C8.88668 18.0358 8.10704 17.7075 7.41308 17.238C6.72969 16.7771 6.14148 16.1888 5.68058 15.5055C4.96518 14.4454 4.58306 13.1956 4.58333 11.9167H2.75C2.75098 13.5609 3.24215 15.1675 4.16075 16.5312C4.75461 17.4077 5.50996 18.163 6.38642 18.7569C7.74824 19.6786 9.3556 20.1697 11 20.1667C11.5585 20.1667 12.1156 20.1105 12.6628 19.999C13.7177 19.7811 14.7197 19.3592 15.6127 18.7569C16.0511 18.4615 16.4597 18.1241 16.8328 17.7495C17.2063 17.3749 17.5439 16.9661 17.8411 16.5285C18.762 15.167 19.2528 13.5604 19.25 11.9167C19.25 11.3582 19.1938 10.8011 19.0823 10.2539Z" fill="#205CF5"/>
</svg>

Before

Width:  |  Height:  |  Size: 1.5 KiB