Compare commits

...

20 Commits
26.0 ... 27.2

Author SHA1 Message Date
Alex Haiut
0ef16bd55a fixed typo in install - helm command (#828)
#patch hotfix to main, release 27.2
2022-02-18 15:38:52 +02:00
Igor Gov
de42de9d62 Merge pull request #817 from up9inc/develop
Develop -> main (Release 27.1) #patch
2022-02-16 10:17:20 +02:00
Gustavo Massaneiro
bb3ae1ef70 Service Map node key as entry.Name instead of entry.IP (#818) 2022-02-16 09:01:59 +02:00
AmitUp9
5dfa94d76e service map - reset button and function deleted (#805)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-02-15 22:57:56 +02:00
Alex Haiut
dfe63f2318 enabled service-map by default (#816) 2022-02-15 22:41:57 +02:00
gadotroee
026745ac8e #major Develop -> main (Release 27.0)
#major (Merge pull request #815 from up9inc/develop)
2022-02-15 17:46:59 +02:00
Alex Haiut
9cf64a43f5 modified namespace in helm command (#814) 2022-02-15 17:14:37 +02:00
gadotroee
5e07924aca Revert "Develop -> main (Release 27.0) (#812)" (#813)
This reverts commit 77078e78d1.

Co-authored-by: Igor Gov <iggvrv@gmail.com>
2022-02-15 17:13:08 +02:00
Igor Gov
77078e78d1 Develop -> main (Release 27.0) (#812)
* ws pause after buttons click (#798)

* ws pause after buttons click

* name change

Co-authored-by: Igor Gov <iggvrv@gmail.com>

* moved CHANGELOG to Mizu wiki in Github (#801)

* Adding logs of k8s client config (#800)

* Support rancher client config (#802)

* Move the request-response matcher's scope from global-level to TCP stream-level (#793)

* Create a new request-response matcher for each TCP stream

* Fix the `ident` formats in request-response matchers

* Don't sort the items in the HTTP tests

* Update tap/extensions/kafka/matcher.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Temporarily change the bucket folder to the new expected

* Bring back the `deleteOlderThan` method

* Use `api.RequestResponseMatcher` instead of `interface{}` as type

* Use `api.RequestResponseMatcher` instead of `interface{}` as type (more)

* Update the key format comments

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Force DaemonSet apply (#804)

Required for apply to work if the DaemonSet is created by another program e.g. Helm.

* Tag entries destination namespace (#803)

* Update main.go, socket_server_handlers.go, and 7 more files...

* Update cors.go

* omit empty namespace in json so tests pass

* fix indent

* Sending telemetry config to server (#808)

* Print http error response details (#792)

* View command - no version check (#810)

* Update mizu install command (#811)

Co-authored-by: AmitUp9 <96980485+AmitUp9@users.noreply.github.com>
Co-authored-by: Alex Haiut <alex@up9.com>
Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>
Co-authored-by: RamiBerm <54766858+RamiBerm@users.noreply.github.com>
2022-02-15 16:54:48 +02:00
Igor Gov
bf2362d836 Update mizu install command (#811) 2022-02-15 16:05:28 +02:00
Igor Gov
1c11523d9d View command - no version check (#810) 2022-02-15 15:49:44 +02:00
Nimrod Gilboa Markevich
150a87413d Print http error response details (#792) 2022-02-15 12:29:34 +02:00
Igor Gov
f7221a7355 Sending telemetry config to server (#808) 2022-02-15 11:08:16 +02:00
RamiBerm
4197ec198c Tag entries destination namespace (#803)
* Update main.go, socket_server_handlers.go, and 7 more files...

* Update cors.go

* omit empty namespace in json so tests pass

* fix indent
2022-02-15 10:51:38 +02:00
Nimrod Gilboa Markevich
5484b7c491 Force DaemonSet apply (#804)
Required for apply to work if the DaemonSet is created by another program e.g. Helm.
2022-02-15 10:16:24 +02:00
M. Mert Yıldıran
041223b558 Move the request-response matcher's scope from global-level to TCP stream-level (#793)
* Create a new request-response matcher for each TCP stream

* Fix the `ident` formats in request-response matchers

* Don't sort the items in the HTTP tests

* Update tap/extensions/kafka/matcher.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Temporarily change the bucket folder to the new expected

* Bring back the `deleteOlderThan` method

* Use `api.RequestResponseMatcher` instead of `interface{}` as type

* Use `api.RequestResponseMatcher` instead of `interface{}` as type (more)

* Update the key format comments

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-02-14 18:16:58 +03:00
Igor Gov
71c04d20ef Support rancher client config (#802) 2022-02-14 16:48:33 +02:00
Igor Gov
8852bac77b Adding logs of k8s client config (#800) 2022-02-14 06:26:57 +02:00
Alex Haiut
59d21e19b7 moved CHANGELOG to Mizu wiki in Github (#801) 2022-02-14 00:16:33 +02:00
AmitUp9
884cb791fc ws pause after buttons click (#798)
* ws pause after buttons click

* name change

Co-authored-by: Igor Gov <iggvrv@gmail.com>
2022-02-13 17:03:33 +02:00
39 changed files with 282 additions and 356 deletions

View File

@@ -118,8 +118,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
for item := range outputItems {
extension := extensionsMap[item.Protocol.Name]
resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation)
resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace)
if extension.Protocol.Name == "http" {
if !disableOASValidation {
var httpPair tapApi.HTTPRequestResponsePair
@@ -158,26 +158,32 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
}
}
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string) {
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string, namespace string) {
if k8sResolver != nil {
unresolvedSource := connectionInfo.ClientIP
resolvedSource = k8sResolver.Resolve(unresolvedSource)
if resolvedSource == "" {
resolvedSourceObject := k8sResolver.Resolve(unresolvedSource)
if resolvedSourceObject == nil {
logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource)
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
return
}
} else {
resolvedSource = resolvedSourceObject.FullAddress
}
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
if resolvedDestination == "" {
resolvedDestinationObject := k8sResolver.Resolve(unresolvedDestination)
if resolvedDestinationObject == nil {
logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination)
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
return
}
} else {
resolvedDestination = resolvedDestinationObject.FullAddress
namespace = resolvedDestinationObject.Namespace
}
}
return resolvedSource, resolvedDestination
return resolvedSource, resolvedDestination, namespace
}
func CheckIsServiceIP(address string) bool {

View File

@@ -104,9 +104,9 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
}
func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
resolvedName := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
if resolvedName != "" {
outboundLinkMessage.Data.DstIP = resolvedName
resolvedNameObject := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
if resolvedNameObject != nil {
outboundLinkMessage.Data.DstIP = resolvedNameObject.FullAddress
} else if outboundLinkMessage.Data.SuggestedResolvedName != "" {
outboundLinkMessage.Data.DstIP = outboundLinkMessage.Data.SuggestedResolvedName
}

View File

@@ -102,13 +102,13 @@ func (s *ServiceMapControllerSuite) TestGet() {
// response nodes
aNode := servicemap.ServiceMapNode{
Id: 1,
Name: TCPEntryA.IP,
Name: TCPEntryA.Name,
Entry: TCPEntryA,
Count: 1,
}
bNode := servicemap.ServiceMapNode{
Id: 2,
Name: TCPEntryB.IP,
Name: TCPEntryB.Name,
Entry: TCPEntryB,
Count: 1,
}

View File

@@ -7,7 +7,7 @@ func CORSMiddleware() gin.HandlerFunc {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With, x-session-token")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT, DELETE")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(204)

View File

@@ -30,6 +30,11 @@ type Resolver struct {
namespace string
}
type ResolvedObjectInfo struct {
FullAddress string
Namespace string
}
func (resolver *Resolver) Start(ctx context.Context) {
if !resolver.isStarted {
resolver.isStarted = true
@@ -40,12 +45,12 @@ func (resolver *Resolver) Start(ctx context.Context) {
}
}
func (resolver *Resolver) Resolve(name string) string {
func (resolver *Resolver) Resolve(name string) *ResolvedObjectInfo {
resolvedName, isFound := resolver.nameMap.Get(name)
if !isFound {
return ""
return nil
}
return resolvedName.(string)
return resolvedName.(*ResolvedObjectInfo)
}
func (resolver *Resolver) GetMap() cmap.ConcurrentMap {
@@ -71,7 +76,7 @@ func (resolver *Resolver) watchPods(ctx context.Context) error {
}
if event.Type == watch.Deleted {
pod := event.Object.(*corev1.Pod)
resolver.saveResolvedName(pod.Status.PodIP, "", event.Type)
resolver.saveResolvedName(pod.Status.PodIP, "", pod.Namespace, event.Type)
}
case <-ctx.Done():
watcher.Stop()
@@ -106,10 +111,10 @@ func (resolver *Resolver) watchEndpoints(ctx context.Context) error {
}
if subset.Addresses != nil {
for _, address := range subset.Addresses {
resolver.saveResolvedName(address.IP, serviceHostname, event.Type)
resolver.saveResolvedName(address.IP, serviceHostname, endpoint.Namespace, event.Type)
for _, port := range ports {
ipWithPort := fmt.Sprintf("%s:%d", address.IP, port)
resolver.saveResolvedName(ipWithPort, serviceHostname, event.Type)
resolver.saveResolvedName(ipWithPort, serviceHostname, endpoint.Namespace, event.Type)
}
}
}
@@ -139,19 +144,19 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
service := event.Object.(*corev1.Service)
serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace)
if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString {
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type)
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
if service.Spec.Ports != nil {
for _, port := range service.Spec.Ports {
if port.Port > 0 {
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, event.Type)
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, service.Namespace, event.Type)
}
}
}
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type)
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
}
if service.Status.LoadBalancer.Ingress != nil {
for _, ingress := range service.Status.LoadBalancer.Ingress {
resolver.saveResolvedName(ingress.IP, serviceHostname, event.Type)
resolver.saveResolvedName(ingress.IP, serviceHostname, service.Namespace, event.Type)
}
}
case <-ctx.Done():
@@ -161,21 +166,22 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
}
}
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
func (resolver *Resolver) saveResolvedName(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.nameMap.Remove(key)
logger.Log.Infof("setting %s=nil", key)
} else {
resolver.nameMap.Set(key, resolved)
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
logger.Log.Infof("setting %s=%s", key, resolved)
}
}
func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) {
func (resolver *Resolver) saveServiceIP(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.serviceMap.Remove(key)
} else {
resolver.serviceMap.Set(key, resolved)
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
}
}

View File

@@ -172,20 +172,33 @@ func (s *serviceMap) NewTCPEntry(src *tapApi.TCP, dst *tapApi.TCP, p *tapApi.Pro
return
}
srcEntry := &entryData{
key: key(src.IP),
entry: src,
}
if len(srcEntry.entry.Name) == 0 {
var srcEntry *entryData
var dstEntry *entryData
if len(src.Name) == 0 {
srcEntry = &entryData{
key: key(src.IP),
entry: src,
}
srcEntry.entry.Name = UnresolvedNodeName
} else {
srcEntry = &entryData{
key: key(src.Name),
entry: src,
}
}
dstEntry := &entryData{
key: key(dst.IP),
entry: dst,
}
if len(dstEntry.entry.Name) == 0 {
if len(dst.Name) == 0 {
dstEntry = &entryData{
key: key(dst.IP),
entry: dst,
}
dstEntry.entry.Name = UnresolvedNodeName
} else {
dstEntry = &entryData{
key: key(dst.Name),
entry: dst,
}
}
s.addEdge(srcEntry, dstEntry, p)

View File

@@ -268,9 +268,14 @@ func (s *ServiceMapEnabledSuite) TestServiceMap() {
assert.LessOrEqual(node.Id, expectedNodeCount)
// entry
// node.Name is the key of the node, key = entry.IP
// node.Name is the key of the node, key = entry.Name by default
// entry.Name is the name of the service and could be unresolved
assert.Equal(node.Name, node.Entry.IP)
// when entry.Name is unresolved, key = entry.IP
if node.Entry.Name == UnresolvedNodeName {
assert.Equal(node.Name, node.Entry.IP)
} else {
assert.Equal(node.Name, node.Entry.Name)
}
assert.Equal(Port, node.Entry.Port)
assert.Equal(entryName, node.Entry.Name)
@@ -320,16 +325,24 @@ func (s *ServiceMapEnabledSuite) TestServiceMap() {
cdEdge := -1
acEdge := -1
var validateEdge = func(edge ServiceMapEdge, sourceEntryName string, destEntryName string, protocolName string, protocolCount int) {
// source
// source node
assert.Contains(nodeIds, edge.Source.Id)
assert.LessOrEqual(edge.Source.Id, expectedNodeCount)
assert.Equal(edge.Source.Name, edge.Source.Entry.IP)
if edge.Source.Entry.Name == UnresolvedNodeName {
assert.Equal(edge.Source.Name, edge.Source.Entry.IP)
} else {
assert.Equal(edge.Source.Name, edge.Source.Entry.Name)
}
assert.Equal(sourceEntryName, edge.Source.Entry.Name)
// destination
// destination node
assert.Contains(nodeIds, edge.Destination.Id)
assert.LessOrEqual(edge.Destination.Id, expectedNodeCount)
assert.Equal(edge.Destination.Name, edge.Destination.Entry.IP)
if edge.Destination.Entry.Name == UnresolvedNodeName {
assert.Equal(edge.Destination.Name, edge.Destination.Entry.IP)
} else {
assert.Equal(edge.Destination.Name, edge.Destination.Entry.Name)
}
assert.Equal(destEntryName, edge.Destination.Entry.Name)
// protocol

View File

@@ -1,5 +1,5 @@
# Mizu release _VER_
Full changelog for stable release see in [docs](https://github.com/up9inc/mizu/blob/main/docs/CHANGELOG.md)
Mizu CHANGELOG is now part of [Mizu wiki](https://github.com/up9inc/mizu/wiki/CHANGELOG)
## Download Mizu for your platform

View File

@@ -4,9 +4,11 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/up9inc/mizu/shared/kubernetes"
@@ -57,10 +59,8 @@ func (provider *Provider) TestConnection() error {
func (provider *Provider) isReachable() (bool, error) {
echoUrl := fmt.Sprintf("%s/echo", provider.url)
if response, err := provider.client.Get(echoUrl); err != nil {
if _, err := provider.get(echoUrl); err != nil {
return false, err
} else if response.StatusCode != 200 {
return false, fmt.Errorf("invalid status code %v", response.StatusCode)
} else {
return true, nil
}
@@ -72,10 +72,8 @@ func (provider *Provider) ReportTapperStatus(tapperStatus shared.TapperStatus) e
if jsonValue, err := json.Marshal(tapperStatus); err != nil {
return fmt.Errorf("failed Marshal the tapper status %w", err)
} else {
if response, err := provider.client.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
if _, err := provider.post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else if response.StatusCode != 200 {
return fmt.Errorf("failed sending to API server the tapper status, response status code %v", response.StatusCode)
} else {
logger.Log.Debugf("Reported to server API about tapper status: %v", tapperStatus)
return nil
@@ -91,10 +89,8 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
if jsonValue, err := json.Marshal(podInfos); err != nil {
return fmt.Errorf("failed Marshal the tapped pods %w", err)
} else {
if response, err := provider.client.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
if _, err := provider.post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else if response.StatusCode != 200 {
return fmt.Errorf("failed sending to API server the tapped pods, response status code %v", response.StatusCode)
} else {
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
return nil
@@ -105,11 +101,9 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
func (provider *Provider) GetGeneralStats() (map[string]interface{}, error) {
generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url)
response, requestErr := provider.client.Get(generalStatsUrl)
response, requestErr := provider.get(generalStatsUrl)
if requestErr != nil {
return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr)
} else if response.StatusCode != 200 {
return nil, fmt.Errorf("failed to get general stats for telemetry, status code: %v", response.StatusCode)
}
defer response.Body.Close()
@@ -132,7 +126,7 @@ func (provider *Provider) GetVersion() (string, error) {
Method: http.MethodGet,
URL: versionUrl,
}
statusResp, err := provider.client.Do(req)
statusResp, err := provider.do(req)
if err != nil {
return "", err
}
@@ -145,3 +139,40 @@ func (provider *Provider) GetVersion() (string, error) {
return versionResponse.Ver, nil
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) get(url string) (*http.Response, error) {
return provider.checkError(provider.client.Get(url))
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) post(url, contentType string, body io.Reader) (*http.Response, error) {
return provider.checkError(provider.client.Post(url, contentType, body))
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) do(req *http.Request) (*http.Response, error) {
return provider.checkError(provider.client.Do(req))
}
func (provider *Provider) checkError(response *http.Response, errInOperation error) (*http.Response, error) {
if (errInOperation != nil) {
return response, errInOperation
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
} else if response.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(response.Body)
response.Body.Close()
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
if err != nil {
return response, err
}
errorMsg := strings.ReplaceAll((string(body)), "\n", ";")
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
}
return response, nil
}

View File

@@ -1,10 +1,9 @@
package cmd
import (
"fmt"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger"
)
var installCmd = &cobra.Command{
@@ -12,13 +11,13 @@ var installCmd = &cobra.Command{
Short: "Installs mizu components",
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("install", nil)
runMizuInstall()
return nil
},
PreRunE: func(cmd *cobra.Command, args []string) error {
if config.Config.IsNsRestrictedMode() {
return fmt.Errorf("install is not supported in restricted namespace mode")
}
logger.Log.Infof("This command has been deprecated, please use helm as described below.\n\n")
logger.Log.Infof("To install stable build of Mizu on your cluster using helm, run the following command:")
logger.Log.Infof(" helm install mizu mizu --repo https://static.up9.com/mizu/helm --namespace=mizu --create-namespace\n\n")
logger.Log.Infof("To install development build of Mizu on your cluster using helm, run the following command:")
logger.Log.Infof(" helm install mizu mizu --repo https://static.up9.com/mizu/helm-develop --namespace=mizu --create-namespace")
return nil
},
@@ -27,4 +26,3 @@ var installCmd = &cobra.Command{
func init() {
rootCmd.AddCommand(installCmd)
}

View File

@@ -1,81 +0,0 @@
package cmd
import (
"context"
"errors"
"fmt"
"github.com/creasty/defaults"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/resources"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func runMizuInstall() {
kubernetesProvider, err := getKubernetesProviderForCli()
if err != nil {
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
var serializedValidationRules string
var serializedContract string
var defaultMaxEntriesDBSizeBytes int64 = 200 * 1000 * 1000
defaultResources := shared.Resources{}
if err := defaults.Set(&defaultResources); err != nil {
logger.Log.Debug(err)
}
mizuAgentConfig := getInstallMizuAgentConfig(defaultMaxEntriesDBSizeBytes, defaultResources)
serializedMizuConfig, err := getSerializedMizuAgentConfig(mizuAgentConfig)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error serializing mizu config: %v", errormessage.FormatError(err)))
return
}
if err = resources.CreateInstallMizuResources(ctx, kubernetesProvider, serializedValidationRules,
serializedContract, serializedMizuConfig, config.Config.IsNsRestrictedMode(),
config.Config.MizuResourcesNamespace, config.Config.AgentImage,
config.Config.KratosImage, config.Config.KetoImage,
nil, defaultMaxEntriesDBSizeBytes, defaultResources, config.Config.ImagePullPolicy(),
config.Config.LogLevel(), false); err != nil {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
logger.Log.Info("Mizu is already running in this namespace, run `mizu clean` to remove the currently running Mizu instance")
} else {
defer resources.CleanUpMizuResources(ctx, cancel, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.MizuResourcesNamespace)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
}
return
}
logger.Log.Infof(uiUtils.Magenta, "Installation completed, run `mizu view` to connect to the mizu daemon instance")
}
func getInstallMizuAgentConfig(maxDBSizeBytes int64, tapperResources shared.Resources) *shared.MizuAgentConfig {
mizuAgentConfig := shared.MizuAgentConfig{
MaxDBSizeBytes: maxDBSizeBytes,
AgentImage: config.Config.AgentImage,
PullPolicy: config.Config.ImagePullPolicyStr,
LogLevel: config.Config.LogLevel(),
TapperResources: tapperResources,
MizuResourcesNamespace: config.Config.MizuResourcesNamespace,
AgentDatabasePath: shared.DataDirPath,
StandaloneMode: true,
ServiceMap: config.Config.ServiceMap,
OAS: config.Config.OAS,
Elastic: config.Config.Elastic,
}
return &mizuAgentConfig
}

View File

@@ -162,6 +162,7 @@ func getTapMizuAgentConfig() *shared.MizuAgentConfig {
AgentDatabasePath: shared.DataDirPath,
ServiceMap: config.Config.ServiceMap,
OAS: config.Config.OAS,
Telemetry: config.Config.Telemetry,
Elastic: config.Config.Elastic,
}

View File

@@ -3,13 +3,13 @@ package cmd
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/utils"
"net/http"
"github.com/up9inc/mizu/cli/utils"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
@@ -62,14 +62,5 @@ func runMizuView() {
uiUtils.OpenBrowser(url)
}
if isCompatible, err := version.CheckVersionCompatibility(apiServerProvider); err != nil {
logger.Log.Errorf("Failed to check versions compatibility %v", err)
cancel()
return
} else if !isCompatible {
cancel()
return
}
utils.WaitForFinish(ctx, cancel)
}

View File

@@ -38,7 +38,7 @@ type ConfigStruct struct {
ConfigFilePath string `yaml:"config-path,omitempty" readonly:""`
HeadlessMode bool `yaml:"headless" default:"false"`
LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""`
ServiceMap bool `yaml:"service-map" default:"false"`
ServiceMap bool `yaml:"service-map" default:"true"`
OAS bool `yaml:"oas,omitempty" default:"false" readonly:""`
Elastic shared.ElasticConfig `yaml:"elastic"`
}

View File

@@ -1,72 +1 @@
# CHANGELOG
This document summarizes main and fixes changes published in stable (aka `main`) branch of this project.
Ongoing work and development releases are under `develop` branch.
## 0.24.0
### main features
* ARM64 support -- Mizu is now available for ARM 64bit architecture
* Now you can run Mizu with `minikube` on your Apple M1 laptop or any other ARM-based hosts
* New command helps user verify Mizu deployment
* Run `mizu check` to verify Mizu was deployed successfully
* `mizu check` verifies version compatibility, resources and permissions required by Mizu
* EXPERIMENTAL: Service Map - graph of all service interactions
* Arrow direction show client to server connection
* Graph edge width reflects volume of traffic captured between the services
* to enable this experimental feature use `--set service-map=true` flag
### improvements
* Mizu container images are now served from [Docker Hub](https://hub.docker.com/r/up9inc/mizu), as multi-architecture images (arm64, amd64)
* in Mizu GUI the filter query can now be applied by pressing CONTROL/COMMAND + ENTER
* try port-forwarding if http-proxy connection to Mizu API server is not available
### notable bug fixes
* Fixed HTTP/1.0 presentation which was shown as HTTP/1.1
* Fixed handling of long-living TCP connections, improves capturing gRPC and HTTP/2 traffic, and helps in service-mesh setups (istio, linkerd)
## 0.23.0
### notable bug fixes
* fixed errors in Redis protocol parser (better handling of Array and Bulk String message types)
## 0.22.0
### main features
* Service Mesh support -- mizu is now capable to tap mTLS traffic between pods connected by Istio service mesh
* Use `--service-mesh` option to enable this feature
* New installation option - have the same Mizu functionality as long living pods in your cluster, with password protection
* To install use `mizu install` command
* To access use `mizu view` or `kubectl -n mizu port-forward svc/mizu-api-server`
* To uninstall run `mizu clean`
* At first login
* Set admin password as prompted, use it to login to mizu later on.
* After login, user should select cluster namespaces to tap: by default all namespaces in the cluster are selected, user can select/unselect according to their needs. These settings are retained and can be modified at any time via Settings menu (cog icon on the top-right)
### improvements
* improved Mizu permissions/roles logic to support clusters with strict PodSecurityPolicy (PSP) -- see [PERMISSIONS](PERMISSIONS.md) doc for more details
### notable bug fixes
* mizu now works properly when API service is exposed via HTTPS url
* mizu now properly displays KAFKA message body
## 0.21.0
### main features
* New traffic search & stream exprience
* Rich query language with full-text search capabilities on headers & body
* Distinct live-streaming vs paging/browsing modes, all with filter applied
### improvements
* GUI - source and destination IP addresses & service names for each traffic item
* GUI - Mizu health - display warning sign in top bar when not all requested pods are successfully tapped
* GUI - pod tapping status reflected in the list (ok or problem)
* Mizu telemetry - report platform type
### fixes
* Request duration and body size properly shown in GUI (instead of -1)
Mizu CHANGELOG is now part of [Mizu wiki](https://github.com/up9inc/mizu/wiki/CHANGELOG)

View File

@@ -76,6 +76,8 @@ func NewProvider(kubeConfigPath string) (*Provider, error) {
"you can set alternative kube config file path by adding the kube-config-path field to the mizu config file, err: %w", kubeConfigPath, err)
}
logger.Log.Debugf("K8s client config, host: %s, api path: %s, user agent: %s", restClientConfig.Host, restClientConfig.APIPath, restClientConfig.UserAgent)
return &Provider{
clientSet: clientSet,
kubernetesConfig: kubernetesConfig,
@@ -952,6 +954,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
labelSelector := applyconfmeta.LabelSelector()
labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName})
applyOptions := metav1.ApplyOptions{
Force: true,
FieldManager: fieldManagerName,
}
daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace)
daemonSet.
WithLabels(map[string]string{
@@ -960,7 +967,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
}).
WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate))
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, metav1.ApplyOptions{FieldManager: fieldManagerName})
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions)
return err
}

View File

@@ -128,9 +128,14 @@ func getHttpDialer(kubernetesProvider *Provider, namespace string, podName strin
return nil, err
}
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName)
hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/") // no need specify "t" twice
serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}
clientConfigHostUrl, err := url.Parse(kubernetesProvider.clientConfig.Host)
if err != nil {
return nil, fmt.Errorf("Failed parsing client config host URL %s, error %w", kubernetesProvider.clientConfig.Host, err)
}
path := fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/portforward", clientConfigHostUrl.Path, namespace, podName)
serverURL := url.URL{Scheme: "https", Path: path, Host: clientConfigHostUrl.Host}
logger.Log.Debugf("Http dialer url %v", serverURL)
return spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL), nil
}

View File

@@ -43,6 +43,7 @@ type MizuAgentConfig struct {
StandaloneMode bool `json:"standaloneMode"`
ServiceMap bool `json:"serviceMap"`
OAS bool `json:"oas"`
Telemetry bool `json:"telemetry"`
Elastic ElasticConfig `json:"elastic"`
}

View File

@@ -39,10 +39,9 @@ type TCP struct {
}
type Extension struct {
Protocol *Protocol
Path string
Dissector Dissector
MatcherMap *sync.Map
Protocol *Protocol
Path string
Dissector Dissector
}
type ConnectionInfo struct {
@@ -62,7 +61,6 @@ type TcpID struct {
}
type CounterPair struct {
StreamId int64
Request uint
Response uint
sync.Mutex
@@ -100,10 +98,15 @@ type SuperIdentifier struct {
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *Entry
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
Macros() map[string]string
NewResponseRequestMatcher() RequestResponseMatcher
}
type RequestResponseMatcher interface {
GetMap() *sync.Map
}
type Emitting struct {
@@ -125,6 +128,7 @@ type Entry struct {
Protocol Protocol `json:"proto"`
Source *TCP `json:"src"`
Destination *TCP `json:"dst"`
Namespace string `json:"namespace,omitempty"`
Outgoing bool `json:"outgoing"`
Timestamp int64 `json:"timestamp"`
StartTime time.Time `json:"startTime"`

View File

@@ -22,6 +22,7 @@ type Cleaner struct {
connectionTimeout time.Duration
stats CleanerStats
statsMutex sync.Mutex
streamsMap *tcpStreamMap
}
func (cl *Cleaner) clean() {
@@ -32,10 +33,15 @@ func (cl *Cleaner) clean() {
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.assemblerMutex.Unlock()
for _, extension := range extensions {
deleted := deleteOlderThan(extension.MatcherMap, startCleanTime.Add(-cl.connectionTimeout))
cl.streamsMap.streams.Range(func(k, v interface{}) bool {
reqResMatcher := v.(*tcpStreamWrapper).reqResMatcher
if reqResMatcher == nil {
return true
}
deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout))
cl.stats.deleted += deleted
}
return true
})
cl.statsMutex.Lock()
logger.Log.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())

View File

@@ -42,7 +42,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
r := AmqpReader{b}
var remaining int
@@ -212,7 +212,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
@@ -254,6 +254,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Method: request["method"].(string),
@@ -300,6 +301,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return nil
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/http/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect2/http/\* expect

View File

@@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
item.ConnectionInfo.ClientPort = ""
}
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil {
return err
@@ -58,7 +58,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
switch messageHTTP1 := messageHTTP1.(type) {
case http.Request:
ident := fmt.Sprintf(
"%s->%s %s->%s %d %s",
"%s_%s_%s_%s_%d_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@@ -78,7 +78,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
}
case http.Response:
ident := fmt.Sprintf(
"%s->%s %s->%s %d %s",
"%s_%s_%s_%s_%d_%s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
@@ -110,7 +110,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b)
if err != nil {
return
@@ -130,8 +130,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d_%s",
counterPair.StreamId,
"%s_%s_%s_%s_%d_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@@ -153,7 +152,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
return
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, err error) {
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response
res, err = http.ReadResponse(b, nil)
if err != nil {
@@ -174,8 +173,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d_%s",
counterPair.StreamId,
"%s_%s_%s_%s_%d_%s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,

View File

@@ -84,14 +84,15 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &http11protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
var err error
isHTTP2, _ := checkIsHTTP2Connection(b, isClient)
@@ -124,7 +125,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options)
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -133,7 +134,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
superIdentifier.Protocol = &http11protocol
} else if isClient {
var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -144,7 +145,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
if switchingProtocolsHTTP2 {
ident := fmt.Sprintf(
"%s->%s %s->%s 1 %s",
"%s_%s_%s_%s_1_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@@ -164,7 +165,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
} else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -181,7 +182,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
return nil
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
var host, authority, path string
request := item.Pair.Request.Payload.(map[string]interface{})
@@ -279,6 +280,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
@@ -472,6 +474,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@@ -11,7 +11,6 @@ import (
"os"
"path"
"path/filepath"
"sort"
"testing"
"time"
@@ -39,7 +38,6 @@ func TestRegister(t *testing.T) {
extension := &api.Extension{}
dissector.Register(extension)
assert.Equal(t, "http", extension.Protocol.Name)
assert.NotNil(t, extension.MatcherMap)
}
func TestMacros(t *testing.T) {
@@ -123,7 +121,8 @@ func TestDissect(t *testing.T) {
SrcPort: "1",
DstPort: "2",
}
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options)
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -141,7 +140,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options)
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -155,14 +154,6 @@ func TestDissect(t *testing.T) {
stop <- true
sort.Slice(items, func(i, j int) bool {
iMarshaled, err := json.Marshal(items[i])
assert.Nil(t, err)
jMarshaled, err := json.Marshal(items[j])
assert.Nil(t, err)
return len(iMarshaled) < len(jMarshaled)
})
marshaled, err := json.Marshal(items)
assert.Nil(t, err)
@@ -214,7 +205,7 @@ func TestAnalyze(t *testing.T) {
var entries []*api.Entry
for _, item := range items {
entry := dissector.Analyze(item, "", "")
entry := dissector.Analyze(item, "", "", "")
entries = append(entries, entry)
}

View File

@@ -8,16 +8,17 @@ import (
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = createResponseRequestMatcher() // global
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{incremental_counter}_{proto_ident}
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {

View File

@@ -33,27 +33,27 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &_protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
return errors.New("Identified by another protocol")
}
if isClient {
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer)
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer, reqResMatcher)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
} else {
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter)
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter, reqResMatcher)
if err != nil {
return err
}
@@ -62,7 +62,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
apiKey := ApiKey(reqDetails["apiKey"].(float64))
@@ -158,6 +158,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
@@ -215,6 +216,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@@ -3,9 +3,10 @@ package kafka
import (
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = CreateResponseRequestMatcher() // global
const maxTry int = 3000
type RequestResponsePair struct {
@@ -13,14 +14,17 @@ type RequestResponsePair struct {
Response Response
}
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}::{correlation_id}
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{correlation_id}
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func CreateResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair {

View File

@@ -19,7 +19,7 @@ type Request struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -214,8 +214,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
}
key := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.SrcIP,
tcpID.SrcPort,
tcpID.DstIP,

View File

@@ -16,7 +16,7 @@ type Response struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -44,8 +44,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s
}
key := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.DstIP,
tcpID.DstPort,
tcpID.SrcIP,

View File

@@ -6,15 +6,14 @@ import (
"github.com/up9inc/mizu/tap/api"
)
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
counterPair.Unlock()
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
@@ -36,15 +35,14 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
return nil
}
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
counterPair.Unlock()
ident := fmt.Sprintf(
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
"%s_%s_%s_%s_%d",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,

View File

@@ -32,14 +32,14 @@ type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = &protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
is := &RedisInputStream{
Reader: b,
Buf: make([]byte, 8192),
@@ -52,9 +52,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isClient {
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket)
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
} else {
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket)
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
}
if err != nil {
@@ -63,7 +63,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
response := item.Pair.Response.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
@@ -96,6 +96,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
@@ -127,6 +128,10 @@ func (d dissecting) Macros() map[string]string {
}
}
func (d dissecting) NewResponseRequestMatcher() api.RequestResponseMatcher {
return createResponseRequestMatcher()
}
var Dissector dissecting
func NewDissector() api.Dissector {

View File

@@ -7,16 +7,17 @@ import (
"github.com/up9inc/mizu/tap/api"
)
var reqResMatcher = createResponseRequestMatcher() // global
// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}`
// Key is `{src_ip}_{dst_ip}_{src_ip}_{src_port}_{incremental_counter}`
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return *newMatcher
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {

View File

@@ -210,6 +210,7 @@ func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem)
assemblerMutex: &assembler.assemblerMutex,
cleanPeriod: cleanPeriod,
connectionTimeout: staleConnectionTimeout,
streamsMap: streamsMap,
}
cleaner.start()

View File

@@ -47,6 +47,7 @@ type tcpReader struct {
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
@@ -94,7 +95,7 @@ func (h *tcpReader) Close() {
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
if err != nil {
_, err = io.Copy(ioutil.Discard, b)
if err != nil {

View File

@@ -29,8 +29,9 @@ type tcpStreamFactory struct {
}
type tcpStreamWrapper struct {
stream *tcpStream
createdAt time.Time
stream *tcpStream
reqResMatcher api.RequestResponseMatcher
createdAt time.Time
}
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory {
@@ -81,8 +82,8 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
if stream.isTapTarget {
stream.id = factory.streamsMap.nextId()
for i, extension := range extensions {
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
counterPair := &api.CounterPair{
StreamId: stream.id,
Request: 0,
Response: 0,
}
@@ -103,6 +104,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
extension: extension,
emitter: factory.Emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
})
stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
@@ -121,11 +123,13 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
extension: extension,
emitter: factory.Emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
})
factory.streamsMap.Store(stream.id, &tcpStreamWrapper{
stream: stream,
createdAt: time.Now(),
stream: stream,
reqResMatcher: reqResMatcher,
createdAt: time.Now(),
})
factory.wg.Add(2)

View File

@@ -76,7 +76,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
const scrollableRef = useRef(null);
const [openOasModal, setOpenOasModal] = useState(false);
const handleOpenModal = () => setOpenOasModal(true);
const handleCloseModal = () => setOpenOasModal(false);
const [showTLSWarning, setShowTLSWarning] = useState(false);
@@ -258,8 +258,14 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
}
}
const handleOpenOasModal = () => {
ws.current.close();
setOpenOasModal(true);
}
const openServiceMapModalDebounce = debounce(() => {
setServiceMapModalOpen(true)
ws.current.close();
setServiceMapModalOpen(true);
}, 500);
return (
@@ -285,7 +291,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
variant="contained"
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
style={{ marginRight: 25 }}
onClick={handleOpenModal}
onClick={handleOpenOasModal}
>
Show OAS
</Button>}

View File

@@ -10,8 +10,7 @@ import debounce from 'lodash/debounce';
import ServiceMapOptions from './ServiceMapOptions'
import { useCommonStyles } from "../../helpers/commonStyle";
import refresh from "../assets/refresh.svg";
import reset from "../assets/reset.svg";
import close from "../assets/close.svg"
import close from "../assets/close.svg";
interface GraphData {
nodes: Node[];
@@ -154,19 +153,6 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
return () => setGraphData({ nodes: [], edges: [] })
}, [getServiceMapData])
const resetServiceMap = debounce(async () => {
try {
const serviceMapResetResponse = await api.serviceMapReset();
if (serviceMapResetResponse["status"] === "enabled") {
refreshServiceMap()
}
} catch (ex) {
toast.error("An error occurred while resetting Mizu Service Map, see console for mode details");
console.error(ex);
}
}, 500);
const refreshServiceMap = debounce(() => {
getServiceMapData();
}, 500);
@@ -192,16 +178,6 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
{!isLoading && <div style={{ height: "100%", width: "100%" }}>
<div style={{ display: "flex", justifyContent: "space-between" }}>
<div>
<Button
startIcon={<img src={reset} className="custom" alt="reset" style={{ marginRight:"8%"}}></img>}
size="large"
variant="contained"
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
style={{ marginRight: 25, paddingTop: "3px", paddingBottom: "1px"}}
onClick={resetServiceMap}
>
Reset
</Button>
<Button
startIcon={<img src={refresh} className="custom" alt="refresh" style={{ marginRight:"8%"}}></img>}
size="medium"

View File

@@ -1,4 +0,0 @@
<svg width="22" height="22" viewBox="0 0 22 22" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M11 14C12.114 14 13 13.1127 13 12C13 10.8873 12.114 10 11 10C9.886 10 9 10.8873 9 12C9 13.1127 9.886 14 11 14Z" fill="#205CF5"/>
<path d="M19.0823 10.2539C18.8662 9.1982 18.4442 8.19548 17.8402 7.30312C17.2467 6.42521 16.4906 5.66909 15.6127 5.07562C14.7202 4.47184 13.7175 4.04978 12.6619 3.83354C12.1074 3.72109 11.5429 3.6658 10.9771 3.66854V1.83337L7.33333 4.58337L10.9771 7.33337V5.50187C11.4208 5.50004 11.8644 5.54221 12.2925 5.63021C13.1129 5.79833 13.8923 6.12632 14.586 6.59546C15.2702 7.05676 15.859 7.64561 16.3203 8.32979C17.0366 9.38875 17.4185 10.6383 17.4167 11.9167C17.4165 12.7746 17.2451 13.6238 16.9125 14.4146C16.7507 14.7955 16.5531 15.1602 16.3222 15.5036C16.0904 15.845 15.827 16.1639 15.5357 16.456C14.6483 17.3417 13.5219 17.9492 12.2943 18.2041C11.4407 18.3765 10.5612 18.3765 9.7075 18.2041C8.88668 18.0358 8.10704 17.7075 7.41308 17.238C6.72969 16.7771 6.14148 16.1888 5.68058 15.5055C4.96518 14.4454 4.58306 13.1956 4.58333 11.9167H2.75C2.75098 13.5609 3.24215 15.1675 4.16075 16.5312C4.75461 17.4077 5.50996 18.163 6.38642 18.7569C7.74824 19.6786 9.3556 20.1697 11 20.1667C11.5585 20.1667 12.1156 20.1105 12.6628 19.999C13.7177 19.7811 14.7197 19.3592 15.6127 18.7569C16.0511 18.4615 16.4597 18.1241 16.8328 17.7495C17.2063 17.3749 17.5439 16.9661 17.8411 16.5285C18.762 15.167 19.2528 13.5604 19.25 11.9167C19.25 11.3582 19.1938 10.8011 19.0823 10.2539Z" fill="#205CF5"/>
</svg>

Before

Width:  |  Height:  |  Size: 1.5 KiB