mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-03-17 00:51:06 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0c8c0c192 | ||
|
|
1c18eb1b84 | ||
|
|
01d6005a7b | ||
|
|
4c97316c02 | ||
|
|
d66c7445e6 | ||
|
|
12ca3d8779 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -32,3 +32,6 @@ pprof/*
|
||||
# Database Files
|
||||
*.bin
|
||||
*.gob
|
||||
|
||||
# Nohup Files - https://man7.org/linux/man-pages/man1/nohup.1p.html
|
||||
nohup.*
|
||||
|
||||
@@ -42,8 +42,8 @@ RUN go build -ldflags="-s -w \
|
||||
-X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
|
||||
|
||||
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64 ./basenine_linux_amd64
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.11/basenine_linux_amd64 ./basenine_linux_amd64
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.11/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
|
||||
RUN shasum -a 256 -c basenine_linux_amd64.sha256
|
||||
RUN chmod +x ./basenine_linux_amd64
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ require (
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
|
||||
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211121072216-04366911881c
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
github.com/up9inc/mizu/tap v0.0.0
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
|
||||
@@ -450,8 +450,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
|
||||
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
|
||||
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
|
||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73 h1:FJUM7w7E0jRGFPcSMa7cVy+jr5zcpbyT6qA30dEtGGI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211121072216-04366911881c h1:GJsCVhDKjV/k3mNG255VN7hAQ7fxyNgX5T+VJyzoOQ0=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211121072216-04366911881c/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
|
||||
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA=
|
||||
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
|
||||
|
||||
@@ -493,19 +493,19 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
|
||||
|
||||
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
|
||||
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
|
||||
eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
|
||||
|
||||
for {
|
||||
select {
|
||||
case wEvent, ok := <-added:
|
||||
case wEvent, ok := <-eventChan:
|
||||
if !ok {
|
||||
added = nil
|
||||
eventChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
event, err := wEvent.ToEvent()
|
||||
if err != nil {
|
||||
logger.Log.Errorf("error parsing Mizu resource added event: %+v", err)
|
||||
logger.Log.Errorf("error parsing Mizu resource event: %+v", err)
|
||||
cancel()
|
||||
}
|
||||
|
||||
@@ -514,45 +514,7 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
|
||||
}
|
||||
|
||||
if event.Type == v1.EventTypeWarning {
|
||||
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
|
||||
}
|
||||
case wEvent, ok := <-removed:
|
||||
if !ok {
|
||||
removed = nil
|
||||
continue
|
||||
}
|
||||
|
||||
event, err := wEvent.ToEvent()
|
||||
if err != nil {
|
||||
logger.Log.Errorf("error parsing Mizu resource removed event: %+v", err)
|
||||
cancel()
|
||||
}
|
||||
|
||||
if startTime.After(event.CreationTimestamp.Time) {
|
||||
continue
|
||||
}
|
||||
|
||||
if event.Type == v1.EventTypeWarning {
|
||||
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
|
||||
}
|
||||
case wEvent, ok := <-modified:
|
||||
if !ok {
|
||||
modified = nil
|
||||
continue
|
||||
}
|
||||
|
||||
event, err := wEvent.ToEvent()
|
||||
if err != nil {
|
||||
logger.Log.Errorf("error parsing Mizu resource modified event: %+v", err)
|
||||
cancel()
|
||||
}
|
||||
|
||||
if startTime.After(event.CreationTimestamp.Time) {
|
||||
continue
|
||||
}
|
||||
|
||||
if event.Type == v1.EventTypeWarning {
|
||||
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
|
||||
logger.Log.Warningf("resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
|
||||
}
|
||||
case err, ok := <-errorChan:
|
||||
if !ok {
|
||||
|
||||
@@ -55,7 +55,7 @@ func GetEntry(c *gin.Context) {
|
||||
}
|
||||
|
||||
extension := extensionsMap[entry.Protocol.Name]
|
||||
protocol, representation, bodySize, _ := extension.Dissector.Represent(entry.Protocol, entry.Request, entry.Response)
|
||||
representation, bodySize, _ := extension.Dissector.Represent(entry.Request, entry.Response)
|
||||
|
||||
var rules []map[string]interface{}
|
||||
var isRulesEnabled bool
|
||||
@@ -68,7 +68,7 @@ func GetEntry(c *gin.Context) {
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{
|
||||
Protocol: protocol,
|
||||
Protocol: entry.Protocol,
|
||||
Representation: string(representation),
|
||||
BodySize: bodySize,
|
||||
Data: entry,
|
||||
|
||||
@@ -3,11 +3,12 @@ package apiserver
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/shared/kubernetes"
|
||||
@@ -61,7 +62,14 @@ func (provider *Provider) GetHealthStatus() (*shared.HealthResponse, error) {
|
||||
if response, err := provider.client.Get(healthUrl); err != nil {
|
||||
return nil, err
|
||||
} else if response.StatusCode > 299 {
|
||||
return nil, errors.New(fmt.Sprintf("status code: %d", response.StatusCode))
|
||||
responseBody := new(strings.Builder)
|
||||
|
||||
if _, err := io.Copy(responseBody, response.Body); err != nil {
|
||||
return nil, fmt.Errorf("status code: %d - (bad response - %v)", response.StatusCode, err)
|
||||
} else {
|
||||
singleLineResponse := strings.ReplaceAll(responseBody.String(), "\n", "")
|
||||
return nil, fmt.Errorf("status code: %d - (response - %v)", response.StatusCode, singleLineResponse)
|
||||
}
|
||||
} else {
|
||||
defer response.Body.Close()
|
||||
|
||||
|
||||
@@ -559,76 +559,73 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
|
||||
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName))
|
||||
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
|
||||
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
|
||||
isPodReady := false
|
||||
timeAfter := time.After(25 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-added:
|
||||
case wEvent, ok := <-eventChan:
|
||||
if !ok {
|
||||
added = nil
|
||||
eventChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Watching API Server pod loop, added")
|
||||
case _, ok := <-removed:
|
||||
if !ok {
|
||||
removed = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
|
||||
cancel()
|
||||
return
|
||||
case wEvent, ok := <-modified:
|
||||
if !ok {
|
||||
modified = nil
|
||||
continue
|
||||
}
|
||||
|
||||
modifiedPod, err := wEvent.ToPod()
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, err)
|
||||
switch wEvent.Type {
|
||||
case kubernetes.EventAdded:
|
||||
logger.Log.Debugf("Watching API Server pod loop, added")
|
||||
case kubernetes.EventDeleted:
|
||||
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
|
||||
|
||||
if modifiedPod.Status.Phase == core.PodPending {
|
||||
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
|
||||
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
||||
return
|
||||
case kubernetes.EventModified:
|
||||
modifiedPod, err := wEvent.ToPod()
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, err)
|
||||
cancel()
|
||||
break
|
||||
continue
|
||||
}
|
||||
|
||||
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
|
||||
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath()))
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
}
|
||||
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
|
||||
|
||||
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
||||
isPodReady = true
|
||||
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
||||
if modifiedPod.Status.Phase == core.PodPending {
|
||||
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
|
||||
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
|
||||
url := GetApiServerUrl()
|
||||
if err := apiProvider.TestConnection(); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
||||
cancel()
|
||||
break
|
||||
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
|
||||
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath()))
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log.Infof("Mizu is available at %s", url)
|
||||
if !config.Config.HeadlessMode {
|
||||
uiUtils.OpenBrowser(url)
|
||||
}
|
||||
if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil {
|
||||
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
||||
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
||||
isPodReady = true
|
||||
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
||||
|
||||
url := GetApiServerUrl()
|
||||
if err := apiProvider.TestConnection(); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
|
||||
logger.Log.Infof("Mizu is available at %s", url)
|
||||
if !config.Config.HeadlessMode {
|
||||
uiUtils.OpenBrowser(url)
|
||||
}
|
||||
if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil {
|
||||
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
||||
}
|
||||
}
|
||||
case kubernetes.EventBookmark:
|
||||
break
|
||||
case kubernetes.EventError:
|
||||
break
|
||||
}
|
||||
case err, ok := <-errorChan:
|
||||
if !ok {
|
||||
@@ -654,76 +651,53 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName))
|
||||
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
|
||||
var prevPodPhase core.PodPhase
|
||||
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
|
||||
|
||||
for {
|
||||
select {
|
||||
case wEvent, ok := <-added:
|
||||
case wEvent, ok := <-eventChan:
|
||||
if !ok {
|
||||
added = nil
|
||||
eventChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
addedPod, err := wEvent.ToPod()
|
||||
pod, err := wEvent.ToPod()
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, err)
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
|
||||
case wEvent, ok := <-removed:
|
||||
if !ok {
|
||||
removed = nil
|
||||
continue
|
||||
}
|
||||
switch wEvent.Type {
|
||||
case kubernetes.EventAdded:
|
||||
logger.Log.Debugf("Tapper is created [%s]", pod.Name)
|
||||
case kubernetes.EventDeleted:
|
||||
logger.Log.Debugf("Tapper is removed [%s]", pod.Name)
|
||||
case kubernetes.EventModified:
|
||||
if pod.Status.Phase == core.PodPending && pod.Status.Conditions[0].Type == core.PodScheduled && pod.Status.Conditions[0].Status != core.ConditionTrue {
|
||||
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", pod.Name, pod.Status.Conditions[0].Message))
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
|
||||
removedPod, err := wEvent.ToPod()
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, err)
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
podStatus := pod.Status
|
||||
|
||||
|
||||
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
|
||||
case wEvent, ok := <-modified:
|
||||
if !ok {
|
||||
modified = nil
|
||||
continue
|
||||
}
|
||||
|
||||
modifiedPod, err := wEvent.ToPod()
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, err)
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
|
||||
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
|
||||
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message))
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
|
||||
podStatus := modifiedPod.Status
|
||||
if podStatus.Phase == core.PodPending && prevPodPhase == podStatus.Phase {
|
||||
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
|
||||
continue
|
||||
}
|
||||
prevPodPhase = podStatus.Phase
|
||||
|
||||
if podStatus.Phase == core.PodRunning {
|
||||
state := podStatus.ContainerStatuses[0].State
|
||||
if state.Terminated != nil {
|
||||
switch state.Terminated.Reason {
|
||||
case "OOMKilled":
|
||||
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name))
|
||||
if podStatus.Phase == core.PodRunning {
|
||||
state := podStatus.ContainerStatuses[0].State
|
||||
if state.Terminated != nil {
|
||||
switch state.Terminated.Reason {
|
||||
case "OOMKilled":
|
||||
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", pod.Name))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
|
||||
logger.Log.Debugf("Tapper %s is %s", pod.Name, strings.ToLower(string(podStatus.Phase)))
|
||||
case kubernetes.EventBookmark:
|
||||
break
|
||||
case kubernetes.EventError:
|
||||
break
|
||||
}
|
||||
case err, ok := <-errorChan:
|
||||
if !ok {
|
||||
errorChan = nil
|
||||
@@ -746,57 +720,19 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
|
||||
|
||||
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
|
||||
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
|
||||
eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
|
||||
|
||||
for {
|
||||
select {
|
||||
case wEvent, ok := <-added:
|
||||
case wEvent, ok := <-eventChan:
|
||||
if !ok {
|
||||
added = nil
|
||||
eventChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
event, err := wEvent.ToEvent()
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource added event: %+v", err))
|
||||
cancel()
|
||||
}
|
||||
|
||||
if startTime.After(event.CreationTimestamp.Time) {
|
||||
continue
|
||||
}
|
||||
|
||||
if event.Type == core.EventTypeWarning {
|
||||
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
|
||||
}
|
||||
case wEvent, ok := <-removed:
|
||||
if !ok {
|
||||
removed = nil
|
||||
continue
|
||||
}
|
||||
|
||||
event, err := wEvent.ToEvent()
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource removed event: %+v", err))
|
||||
cancel()
|
||||
}
|
||||
|
||||
if startTime.After(event.CreationTimestamp.Time) {
|
||||
continue
|
||||
}
|
||||
|
||||
if event.Type == core.EventTypeWarning {
|
||||
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
|
||||
}
|
||||
case wEvent, ok := <-modified:
|
||||
if !ok {
|
||||
modified = nil
|
||||
continue
|
||||
}
|
||||
|
||||
event, err := wEvent.ToEvent()
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource modified event: %+v", err))
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource event: %+v", err))
|
||||
cancel()
|
||||
}
|
||||
|
||||
|
||||
@@ -37,8 +37,8 @@ COPY agent .
|
||||
RUN go build -gcflags="all=-N -l" -o mizuagent .
|
||||
|
||||
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64 ./basenine_linux_amd64
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.11/basenine_linux_amd64 ./basenine_linux_amd64
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.11/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
|
||||
RUN shasum -a 256 -c basenine_linux_amd64.sha256
|
||||
RUN chmod +x ./basenine_linux_amd64
|
||||
|
||||
|
||||
@@ -70,7 +70,7 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro
|
||||
|
||||
func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
|
||||
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
|
||||
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
|
||||
eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
|
||||
|
||||
restartTappers := func() {
|
||||
err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
|
||||
@@ -96,9 +96,9 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case wEvent, ok := <-added:
|
||||
case wEvent, ok := <-eventChan:
|
||||
if !ok {
|
||||
added = nil
|
||||
eventChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -109,44 +109,28 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
|
||||
}
|
||||
|
||||
|
||||
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
||||
restartTappersDebouncer.SetOn()
|
||||
case wEvent, ok := <-removed:
|
||||
if !ok {
|
||||
removed = nil
|
||||
continue
|
||||
}
|
||||
|
||||
pod, err := wEvent.ToPod()
|
||||
if err != nil {
|
||||
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
||||
restartTappersDebouncer.SetOn()
|
||||
case wEvent, ok := <-modified:
|
||||
if !ok {
|
||||
modified = nil
|
||||
continue
|
||||
}
|
||||
|
||||
pod, err := wEvent.ToPod()
|
||||
if err != nil {
|
||||
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
|
||||
// Act only if the modified pod has already obtained an IP address.
|
||||
// After filtering for IPs, on a normal pod restart this includes the following events:
|
||||
// - Pod deletion
|
||||
// - Pod reaches start state
|
||||
// - Pod reaches ready state
|
||||
// Ready/unready transitions might also trigger this event.
|
||||
if pod.Status.PodIP != "" {
|
||||
switch wEvent.Type {
|
||||
case EventAdded:
|
||||
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
||||
restartTappersDebouncer.SetOn()
|
||||
case EventDeleted:
|
||||
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
||||
restartTappersDebouncer.SetOn()
|
||||
case EventModified:
|
||||
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
|
||||
// Act only if the modified pod has already obtained an IP address.
|
||||
// After filtering for IPs, on a normal pod restart this includes the following events:
|
||||
// - Pod deletion
|
||||
// - Pod reaches start state
|
||||
// - Pod reaches ready state
|
||||
// Ready/unready transitions might also trigger this event.
|
||||
if pod.Status.PodIP != "" {
|
||||
restartTappersDebouncer.SetOn()
|
||||
}
|
||||
case EventBookmark:
|
||||
break
|
||||
case EventError:
|
||||
break
|
||||
}
|
||||
case err, ok := <-errorChan:
|
||||
if !ok {
|
||||
|
||||
@@ -20,10 +20,8 @@ type WatchCreator interface {
|
||||
NewWatcher(ctx context.Context, namespace string) (watch.Interface, error)
|
||||
}
|
||||
|
||||
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (chan *WatchEvent, chan *WatchEvent, chan *WatchEvent, chan error) {
|
||||
addedChan := make(chan *WatchEvent)
|
||||
modifiedChan := make(chan *WatchEvent)
|
||||
removedChan := make(chan *WatchEvent)
|
||||
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (<-chan *WatchEvent, <-chan error) {
|
||||
eventChan := make(chan *WatchEvent)
|
||||
errorChan := make(chan error)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
@@ -42,7 +40,7 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
|
||||
break
|
||||
}
|
||||
|
||||
err = startWatchLoop(ctx, watcher, filterer, addedChan, modifiedChan, removedChan) // blocking
|
||||
err = startWatchLoop(ctx, watcher, filterer, eventChan) // blocking
|
||||
watcher.Stop()
|
||||
|
||||
select {
|
||||
@@ -73,16 +71,14 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
wg.Wait()
|
||||
close(addedChan)
|
||||
close(modifiedChan)
|
||||
close(removedChan)
|
||||
close(eventChan)
|
||||
close(errorChan)
|
||||
}()
|
||||
|
||||
return addedChan, modifiedChan, removedChan, errorChan
|
||||
return eventChan, errorChan
|
||||
}
|
||||
|
||||
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, addedChan chan *WatchEvent, modifiedChan chan *WatchEvent, removedChan chan *WatchEvent) error {
|
||||
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, eventChan chan<- *WatchEvent) error {
|
||||
resultChan := watcher.ResultChan()
|
||||
for {
|
||||
select {
|
||||
@@ -103,14 +99,7 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer Event
|
||||
continue
|
||||
}
|
||||
|
||||
switch wEvent.Type {
|
||||
case watch.Added:
|
||||
addedChan <- &wEvent
|
||||
case watch.Modified:
|
||||
modifiedChan <- &wEvent
|
||||
case watch.Deleted:
|
||||
removedChan <- &wEvent
|
||||
}
|
||||
eventChan <- &wEvent
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -10,6 +10,14 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
const (
|
||||
EventAdded watch.EventType = watch.Added
|
||||
EventModified watch.EventType = watch.Modified
|
||||
EventDeleted watch.EventType = watch.Deleted
|
||||
EventBookmark watch.EventType = watch.Bookmark
|
||||
EventError watch.EventType = watch.Error
|
||||
)
|
||||
|
||||
type InvalidObjectType struct {
|
||||
RequestedType reflect.Type
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
var Log = logging.MustGetLogger("mizu")
|
||||
|
||||
var format = logging.MustStringFormatter(
|
||||
`%{time:2006-01-02T15:04:05.999Z-07:00} %{level:-5s} ▶ %{message} ▶ %{pid} %{shortfile} %{shortfunc}`,
|
||||
`[%{time:2006-01-02T15:04:05.000-0700}] %{level:-5s} ▶ %{message} ▶ [%{pid} %{shortfile} %{shortfunc}]`,
|
||||
)
|
||||
|
||||
func InitLogger(logPath string) {
|
||||
|
||||
@@ -99,7 +99,7 @@ type Dissector interface {
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error
|
||||
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *MizuEntry
|
||||
Summarize(entry *MizuEntry) *BaseEntryDetails
|
||||
Represent(pIn Protocol, request map[string]interface{}, response map[string]interface{}) (pOut Protocol, object []byte, bodySize int64, err error)
|
||||
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
|
||||
Macros() map[string]string
|
||||
}
|
||||
|
||||
|
||||
@@ -325,8 +325,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
|
||||
protoOut = protocol
|
||||
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
|
||||
bodySize = 0
|
||||
representation := make(map[string]interface{}, 0)
|
||||
var repRequest []interface{}
|
||||
@@ -363,7 +362,7 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
|
||||
|
||||
func (d dissecting) Macros() map[string]string {
|
||||
return map[string]string{
|
||||
`amqp`: fmt.Sprintf(`proto.abbr == "%s"`, protocol.Abbreviation),
|
||||
`amqp`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,8 +23,8 @@ func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *ap
|
||||
emitter.Emit(item)
|
||||
}
|
||||
|
||||
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
streamID, messageHTTP1, err := grpcAssembler.readMessage()
|
||||
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -73,7 +73,11 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTime
|
||||
}
|
||||
|
||||
if item != nil {
|
||||
item.Protocol = http2Protocol
|
||||
if isGrpc {
|
||||
item.Protocol = grpcProtocol
|
||||
} else {
|
||||
item.Protocol = http2Protocol
|
||||
}
|
||||
filterAndEmit(item, emitter, options)
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
@@ -27,6 +28,26 @@ const protoMinorHTTP2 = 0
|
||||
|
||||
var maxHTTP2DataLen = 1 * 1024 * 1024 // 1MB
|
||||
|
||||
var grpcStatusCodes = []string{
|
||||
"OK",
|
||||
"CANCELLED",
|
||||
"UNKNOWN",
|
||||
"INVALID_ARGUMENT",
|
||||
"DEADLINE_EXCEEDED",
|
||||
"NOT_FOUND",
|
||||
"ALREADY_EXISTS",
|
||||
"PERMISSION_DENIED",
|
||||
"RESOURCE_EXHAUSTED",
|
||||
"FAILED_PRECONDITION",
|
||||
"ABORTED",
|
||||
"OUT_OF_RANGE",
|
||||
"UNIMPLEMENTED",
|
||||
"INTERNAL",
|
||||
"UNAVAILABLE",
|
||||
"DATA_LOSS",
|
||||
"UNAUTHENTICATED",
|
||||
}
|
||||
|
||||
type messageFragment struct {
|
||||
headers []hpack.HeaderField
|
||||
data []byte
|
||||
@@ -71,37 +92,38 @@ func (fbs *fragmentsByStream) pop(streamID uint32) ([]hpack.HeaderField, []byte)
|
||||
return headers, data
|
||||
}
|
||||
|
||||
func createGrpcAssembler(b *bufio.Reader) *GrpcAssembler {
|
||||
func createHTTP2Assembler(b *bufio.Reader) *Http2Assembler {
|
||||
var framerOutput bytes.Buffer
|
||||
framer := http2.NewFramer(&framerOutput, b)
|
||||
framer.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
|
||||
return &GrpcAssembler{
|
||||
return &Http2Assembler{
|
||||
fragmentsByStream: make(fragmentsByStream),
|
||||
framer: framer,
|
||||
}
|
||||
}
|
||||
|
||||
type GrpcAssembler struct {
|
||||
type Http2Assembler struct {
|
||||
fragmentsByStream fragmentsByStream
|
||||
framer *http2.Framer
|
||||
}
|
||||
|
||||
func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
|
||||
func (ga *Http2Assembler) readMessage() (streamID uint32, messageHTTP1 interface{}, isGrpc bool, err error) {
|
||||
// Exactly one Framer is used for each half connection.
|
||||
// (Instead of creating a new Framer for each ReadFrame operation)
|
||||
// This is needed in order to decompress the headers,
|
||||
// because the compression context is updated with each requests/response.
|
||||
frame, err := ga.framer.ReadFrame()
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
return
|
||||
}
|
||||
|
||||
streamID := frame.Header().StreamID
|
||||
streamID = frame.Header().StreamID
|
||||
|
||||
ga.fragmentsByStream.appendFrame(streamID, frame)
|
||||
|
||||
if !(ga.isStreamEnd(frame)) {
|
||||
return 0, nil, nil
|
||||
streamID = 0
|
||||
return
|
||||
}
|
||||
|
||||
headers, data := ga.fragmentsByStream.pop(streamID)
|
||||
@@ -115,13 +137,29 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
|
||||
dataString := base64.StdEncoding.EncodeToString(data)
|
||||
|
||||
// Use http1 types only because they are expected in http_matcher.
|
||||
// TODO: Create an interface that will be used by http_matcher:registerRequest and http_matcher:registerRequest
|
||||
// to accept both HTTP/1.x and HTTP/2 requests and responses
|
||||
var messageHTTP1 interface{}
|
||||
if _, ok := headersHTTP1[":method"]; ok {
|
||||
method := headersHTTP1.Get(":method")
|
||||
status := headersHTTP1.Get(":status")
|
||||
|
||||
// gRPC detection
|
||||
grpcStatus := headersHTTP1.Get("Grpc-Status")
|
||||
if grpcStatus != "" {
|
||||
isGrpc = true
|
||||
status = grpcStatus
|
||||
}
|
||||
|
||||
if strings.Contains(headersHTTP1.Get("Content-Type"), "application/grpc") {
|
||||
isGrpc = true
|
||||
grpcPath := headersHTTP1.Get(":path")
|
||||
pathSegments := strings.Split(grpcPath, "/")
|
||||
if len(pathSegments) > 0 {
|
||||
method = pathSegments[len(pathSegments)-1]
|
||||
}
|
||||
}
|
||||
|
||||
if method != "" {
|
||||
messageHTTP1 = http.Request{
|
||||
URL: &url.URL{},
|
||||
Method: "POST",
|
||||
Method: method,
|
||||
Header: headersHTTP1,
|
||||
Proto: protoHTTP2,
|
||||
ProtoMajor: protoMajorHTTP2,
|
||||
@@ -129,8 +167,16 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
|
||||
Body: io.NopCloser(strings.NewReader(dataString)),
|
||||
ContentLength: int64(len(dataString)),
|
||||
}
|
||||
} else if _, ok := headersHTTP1[":status"]; ok {
|
||||
} else if status != "" {
|
||||
var statusCode int
|
||||
|
||||
statusCode, err = strconv.Atoi(status)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
messageHTTP1 = http.Response{
|
||||
StatusCode: statusCode,
|
||||
Header: headersHTTP1,
|
||||
Proto: protoHTTP2,
|
||||
ProtoMajor: protoMajorHTTP2,
|
||||
@@ -139,13 +185,14 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
|
||||
ContentLength: int64(len(dataString)),
|
||||
}
|
||||
} else {
|
||||
return 0, nil, errors.New("failed to assemble stream: neither a request nor a message")
|
||||
err = errors.New("failed to assemble stream: neither a request nor a message")
|
||||
return
|
||||
}
|
||||
|
||||
return streamID, messageHTTP1, nil
|
||||
return
|
||||
}
|
||||
|
||||
func (ga *GrpcAssembler) isStreamEnd(frame http2.Frame) bool {
|
||||
func (ga *Http2Assembler) isStreamEnd(frame http2.Frame) bool {
|
||||
switch frame := frame.(type) {
|
||||
case *http2.MetaHeadersFrame:
|
||||
if frame.StreamEnded() {
|
||||
@@ -23,21 +23,35 @@ var protocol api.Protocol = api.Protocol{
|
||||
ForegroundColor: "#ffffff",
|
||||
FontSize: 12,
|
||||
ReferenceLink: "https://datatracker.ietf.org/doc/html/rfc2616",
|
||||
Ports: []string{"80", "8080", "50051"},
|
||||
Ports: []string{"80", "443", "8080"},
|
||||
Priority: 0,
|
||||
}
|
||||
|
||||
var http2Protocol api.Protocol = api.Protocol{
|
||||
Name: "http",
|
||||
LongName: "Hypertext Transfer Protocol Version 2 (HTTP/2) (gRPC)",
|
||||
LongName: "Hypertext Transfer Protocol Version 2 (HTTP/2)",
|
||||
Abbreviation: "HTTP/2",
|
||||
Macro: "grpc",
|
||||
Macro: "http2",
|
||||
Version: "2.0",
|
||||
BackgroundColor: "#244c5a",
|
||||
ForegroundColor: "#ffffff",
|
||||
FontSize: 11,
|
||||
ReferenceLink: "https://datatracker.ietf.org/doc/html/rfc7540",
|
||||
Ports: []string{"80", "8080"},
|
||||
Ports: []string{"80", "443", "8080"},
|
||||
Priority: 0,
|
||||
}
|
||||
|
||||
var grpcProtocol api.Protocol = api.Protocol{
|
||||
Name: "http",
|
||||
LongName: "Hypertext Transfer Protocol Version 2 (HTTP/2) [ gRPC over HTTP/2 ]",
|
||||
Abbreviation: "gRPC",
|
||||
Macro: "grpc",
|
||||
Version: "2.0",
|
||||
BackgroundColor: "#244c5a",
|
||||
ForegroundColor: "#ffffff",
|
||||
FontSize: 11,
|
||||
ReferenceLink: "https://grpc.github.io/grpc/core/md_doc_statuscodes.html",
|
||||
Ports: []string{"80", "443", "8080", "50051"},
|
||||
Priority: 0,
|
||||
}
|
||||
|
||||
@@ -64,10 +78,10 @@ func (d dissecting) Ping() {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
|
||||
|
||||
var grpcAssembler *GrpcAssembler
|
||||
var http2Assembler *Http2Assembler
|
||||
if isHTTP2 {
|
||||
prepareHTTP2Connection(b, isClient)
|
||||
grpcAssembler = createGrpcAssembler(b)
|
||||
http2Assembler = createHTTP2Assembler(b)
|
||||
}
|
||||
|
||||
dissected := false
|
||||
@@ -77,7 +91,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
}
|
||||
|
||||
if isHTTP2 {
|
||||
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter, options)
|
||||
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
@@ -110,17 +124,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetHostname(address, newHostname string) string {
|
||||
replacedUrl, err := url.Parse(address)
|
||||
if err != nil {
|
||||
return address
|
||||
}
|
||||
replacedUrl.Host = newHostname
|
||||
return replacedUrl.String()
|
||||
}
|
||||
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.MizuEntry {
|
||||
var host, scheme, authority, path, service string
|
||||
var host, authority, path, service string
|
||||
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
response := item.Pair.Response.Payload.(map[string]interface{})
|
||||
@@ -135,9 +140,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
if h["name"] == ":authority" {
|
||||
authority = h["value"].(string)
|
||||
}
|
||||
if h["name"] == ":scheme" {
|
||||
scheme = h["value"].(string)
|
||||
}
|
||||
if h["name"] == ":path" {
|
||||
path = h["value"].(string)
|
||||
}
|
||||
@@ -148,9 +150,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
}
|
||||
|
||||
if item.Protocol.Version == "2.0" {
|
||||
service = fmt.Sprintf("%s://%s", scheme, authority)
|
||||
service = authority
|
||||
} else {
|
||||
service = fmt.Sprintf("http://%s", host)
|
||||
service = host
|
||||
u, err := url.Parse(reqDetails["url"].(string))
|
||||
if err != nil {
|
||||
path = reqDetails["url"].(string)
|
||||
@@ -178,9 +180,20 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
reqDetails["queryString"] = mapSliceRebuildAsMap(reqDetails["_queryString"].([]interface{}))
|
||||
|
||||
if resolvedDestination != "" {
|
||||
service = SetHostname(service, resolvedDestination)
|
||||
service = resolvedDestination
|
||||
} else if resolvedSource != "" {
|
||||
service = SetHostname(service, resolvedSource)
|
||||
service = resolvedSource
|
||||
}
|
||||
|
||||
method := reqDetails["method"].(string)
|
||||
statusCode := int(resDetails["status"].(float64))
|
||||
if item.Protocol.Abbreviation == "gRPC" {
|
||||
resDetails["statusText"] = grpcStatusCodes[statusCode]
|
||||
}
|
||||
|
||||
if item.Protocol.Version == "2.0" {
|
||||
reqDetails["url"] = path
|
||||
request["url"] = path
|
||||
}
|
||||
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
@@ -188,10 +201,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
elapsedTime = 0
|
||||
}
|
||||
httpPair, _ := json.Marshal(item.Pair)
|
||||
_protocol := protocol
|
||||
_protocol.Version = item.Protocol.Version
|
||||
return &api.MizuEntry{
|
||||
Protocol: _protocol,
|
||||
Protocol: item.Protocol,
|
||||
Source: &api.TCP{
|
||||
Name: resolvedSource,
|
||||
IP: item.ConnectionInfo.ClientIP,
|
||||
@@ -206,8 +217,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
Request: reqDetails,
|
||||
Response: resDetails,
|
||||
Url: fmt.Sprintf("%s%s", service, path),
|
||||
Method: reqDetails["method"].(string),
|
||||
Status: int(resDetails["status"].(float64)),
|
||||
Method: method,
|
||||
Status: statusCode,
|
||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
@@ -226,15 +237,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
}
|
||||
|
||||
func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
var p api.Protocol
|
||||
if entry.Protocol.Version == "2.0" {
|
||||
p = http2Protocol
|
||||
} else {
|
||||
p = protocol
|
||||
}
|
||||
return &api.BaseEntryDetails{
|
||||
Id: entry.Id,
|
||||
Protocol: p,
|
||||
Protocol: entry.Protocol,
|
||||
Url: entry.Url,
|
||||
RequestSenderIp: entry.RequestSenderIp,
|
||||
Service: entry.Service,
|
||||
@@ -408,12 +413,7 @@ func representResponse(response map[string]interface{}) (repResponse []interface
|
||||
return
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
|
||||
if protoIn.Version == "2.0" {
|
||||
protoOut = http2Protocol
|
||||
} else {
|
||||
protoOut = protocol
|
||||
}
|
||||
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
|
||||
representation := make(map[string]interface{}, 0)
|
||||
repRequest := representRequest(request)
|
||||
repResponse, bodySize := representResponse(response)
|
||||
@@ -425,9 +425,9 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
|
||||
|
||||
func (d dissecting) Macros() map[string]string {
|
||||
return map[string]string{
|
||||
`http`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, protocol.Version),
|
||||
`grpc`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, http2Protocol.Version),
|
||||
`http2`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, http2Protocol.Version),
|
||||
`http`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s"`, protocol.Name, protocol.Version),
|
||||
`http2`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s"`, protocol.Name, http2Protocol.Version),
|
||||
`grpc`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s" and proto.macro == "%s"`, protocol.Name, grpcProtocol.Version, grpcProtocol.Macro),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -210,8 +210,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
|
||||
protoOut = _protocol
|
||||
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
|
||||
bodySize = 0
|
||||
representation := make(map[string]interface{}, 0)
|
||||
|
||||
@@ -258,7 +257,7 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
|
||||
|
||||
func (d dissecting) Macros() map[string]string {
|
||||
return map[string]string{
|
||||
`kafka`: fmt.Sprintf(`proto.abbr == "%s"`, _protocol.Abbreviation),
|
||||
`kafka`: fmt.Sprintf(`proto.name == "%s"`, _protocol.Name),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -146,8 +146,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
|
||||
protoOut = protocol
|
||||
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
|
||||
bodySize = 0
|
||||
representation := make(map[string]interface{}, 0)
|
||||
repRequest := representGeneric(request, `request.`)
|
||||
@@ -160,7 +159,7 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
|
||||
|
||||
func (d dissecting) Macros() map[string]string {
|
||||
return map[string]string{
|
||||
`redis`: fmt.Sprintf(`proto.abbr == "%s"`, protocol.Abbreviation),
|
||||
`redis`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,8 @@ import (
|
||||
"github.com/up9inc/mizu/tap/source"
|
||||
)
|
||||
|
||||
const PACKETS_SEEN_LOG_THRESHOLD = 1000
|
||||
|
||||
type tcpAssembler struct {
|
||||
*reassembly.Assembler
|
||||
streamPool *reassembly.StreamPool
|
||||
@@ -63,7 +65,11 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
|
||||
|
||||
for packetInfo := range packets {
|
||||
packetsCount := diagnose.AppStats.IncPacketsCount()
|
||||
logger.Log.Debugf("PACKET #%d", packetsCount)
|
||||
|
||||
if packetsCount % PACKETS_SEEN_LOG_THRESHOLD == 0 {
|
||||
logger.Log.Debugf("Packets seen: #%d", packetsCount)
|
||||
}
|
||||
|
||||
packet := packetInfo.Packet
|
||||
data := packet.Data()
|
||||
diagnose.AppStats.UpdateProcessedBytes(uint64(len(data)))
|
||||
@@ -85,7 +91,7 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
|
||||
CaptureInfo: packet.Metadata().CaptureInfo,
|
||||
}
|
||||
diagnose.InternalStats.Totalsz += len(tcp.Payload)
|
||||
logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
|
||||
logger.Log.Debugf("%s:%v -> %s:%v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
|
||||
a.assemblerMutex.Lock()
|
||||
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
|
||||
a.assemblerMutex.Unlock()
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
border: solid 1px $secondary-font-color
|
||||
margin-left: 4px
|
||||
padding: 2px 5px
|
||||
text-transform: uppercase
|
||||
font-family: "Source Sans Pro", sans-serif
|
||||
font-size: 11px
|
||||
font-weight: bold
|
||||
|
||||
Reference in New Issue
Block a user