Compare commits

...

7 Commits

Author SHA1 Message Date
gadotroee
2a31739100 Fix acceptance test (body size and right side pane changes) (#907) 2022-03-22 09:56:34 +02:00
M. Mert Yıldıran
308fa78955 TRA-4383 Calculate request and response sizes and display them instead of BodySize field (#897)
* Define `ReadProgress` struct and update `Dissector` interface such that the `bufio.Reader` progress can be learned on item emitting

* Display the `requestSize` and `responseSize` fields in the UI

* Update the tests

* publish ui-common version 1.0.130 and bump to this version in ui/package.json file

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: Roee Gadot <roee.gadot@up9.com>
2022-03-21 19:34:59 +02:00
RoyUP9
cff5987ed4 Added check pre install (#905) 2022-03-21 17:19:04 +02:00
Adam Kol
7893b4596d closing ws on modal open (#904)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-03-21 16:44:40 +02:00
M. Mert Yıldıran
774f07fccd Add /db/flush and /db/reset API endpoints (#896)
* Add `/db/flush` and `/db/reset` API endpoints

* Handle the unmarshalling errors better in the WebSocket

* Handle Basenine connection error better in the WebSocket

* Upgrade to Basenine `v0.6.5`

* Fix the duplicated `StartTime` state

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-03-21 15:54:36 +02:00
RoyUP9
482e5c8b69 Added check pull image flag (#899)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-03-21 15:24:03 +02:00
RamiBerm
21902b5f86 Fix tapping status falling out of sync (#898)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-03-21 14:54:25 +02:00
55 changed files with 801 additions and 589 deletions

View File

@@ -77,8 +77,8 @@ RUN go build -ldflags="-extldflags=-static -s -w \
-X 'github.com/up9inc/mizu/agent/pkg/version.Ver=${VER}'" -o mizuagent .
# Download Basenine executable, verify the sha1sum
ADD https://github.com/up9inc/basenine/releases/download/v0.6.3/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
ADD https://github.com/up9inc/basenine/releases/download/v0.6.3/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.6.5/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
ADD https://github.com/up9inc/basenine/releases/download/v0.6.5/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
RUN shasum -a 256 -c basenine_linux_${GOARCH}.sha256
RUN chmod +x ./basenine_linux_${GOARCH}
RUN mv ./basenine_linux_${GOARCH} ./basenine

View File

@@ -142,7 +142,9 @@ function deepCheck(generalDict, protocolDict, methodDict, entry) {
if (value) {
if (value.tab === valueTabs.response)
cy.contains('Response').click();
// temporary fix, change to some "data-cy" attribute,
// this will fix the issue that happen because we have "response:" in the header of the right side
cy.get('#rightSideContainer > :nth-child(3)').contains('Response').click();
cy.get(Cypress.env('bodyJsonClass')).then(text => {
expect(text.text()).to.match(value.regex)
});

View File

@@ -40,32 +40,23 @@ it('filtering guide check', function () {
});
it('right side sanity test', function () {
cy.get('#entryDetailedTitleBodySize').then(sizeTopLine => {
const sizeOnTopLine = sizeTopLine.text().replace(' B', '');
cy.contains('Response').click();
cy.contains('Body Size (bytes)').parent().next().then(size => {
const bodySizeByDetails = size.text();
expect(sizeOnTopLine).to.equal(bodySizeByDetails, 'The body size in the top line should match the details in the response');
cy.get('#entryDetailedTitleElapsedTime').then(timeInMs => {
const time = timeInMs.text();
if (time < '0ms') {
throw new Error(`The time in the top line cannot be negative ${time}`);
}
});
if (parseInt(bodySizeByDetails) < 0) {
throw new Error(`The body size cannot be negative. got the size: ${bodySizeByDetails}`)
}
// temporary fix, change to some "data-cy" attribute,
// this will fix the issue that happen because we have "response:" in the header of the right side
cy.get('#rightSideContainer > :nth-child(3)').contains('Response').click();
cy.get('#entryDetailedTitleElapsedTime').then(timeInMs => {
const time = timeInMs.text();
if (time < '0ms') {
throw new Error(`The time in the top line cannot be negative ${time}`);
}
cy.get('#rightSideContainer [title="Status Code"]').then(status => {
const statusCode = status.text();
cy.contains('Status').parent().next().then(statusInDetails => {
const statusCodeInDetails = statusInDetails.text();
cy.get('#rightSideContainer [title="Status Code"]').then(status => {
const statusCode = status.text();
cy.contains('Status').parent().next().then(statusInDetails => {
const statusCodeInDetails = statusInDetails.text();
expect(statusCode).to.equal(statusCodeInDetails, 'The status code in the top line should match the status code in details');
});
});
});
expect(statusCode).to.equal(statusCodeInDetails, 'The status code in the top line should match the status code in details');
});
});
});
@@ -252,7 +243,9 @@ function deeperChcek(leftSidePath, rightSidePath, filterName, leftSideExpectedTe
}
function checkRightSideResponseBody() {
cy.contains('Response').click();
// temporary fix, change to some "data-cy" attribute,
// this will fix the issue that happen because we have "response:" in the header of the right side
cy.get('#rightSideContainer > :nth-child(3)').contains('Response').click();
clickCheckbox('Decode Base64');
cy.get(`${Cypress.env('bodyJsonClass')}`).then(value => {

View File

@@ -20,7 +20,7 @@ require (
github.com/orcaman/concurrent-map v1.0.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/stretchr/testify v1.7.0
github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e
github.com/up9inc/basenine/client/go v0.0.0-20220317230530-8472d80307f6
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0

View File

@@ -655,8 +655,8 @@ github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ=
github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw=
github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e h1:/9dFXqvRDHcwPQdIGHP6iz6M0iAWBPOxYf6C+Ntq5w0=
github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20220317230530-8472d80307f6 h1:c0aVbLKYeFDAg246+NDgie2y484bsc20NaKLo8ODV3E=
github.com/up9inc/basenine/client/go v0.0.0-20220317230530-8472d80307f6/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg=
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/wI2L/jsondiff v0.1.1 h1:r2TkoEet7E4JMO5+s1RCY2R0LrNPNHY6hbDeow2hRHw=

View File

@@ -47,7 +47,6 @@ var apiServerAddress = flag.String("api-server-address", "", "Address of mizu AP
var namespace = flag.String("namespace", "", "Resolve IPs if they belong to resources in this namespace (default is all)")
var harsReaderMode = flag.Bool("hars-read", false, "Run in hars-read mode")
var harsDir = flag.String("hars-dir", "", "Directory to read hars from")
var startTime int64
const (
socketConnectionRetries = 30
@@ -110,7 +109,7 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engin
app.Use(middlewares.CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before
api.WebSocketRoutes(app, &eventHandlers, startTime)
api.WebSocketRoutes(app, &eventHandlers)
if config.Config.OAS {
routes.OASRoutes(app)
@@ -124,6 +123,7 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engin
routes.EntriesRoutes(app)
routes.MetadataRoutes(app)
routes.StatusRoutes(app)
routes.DbRoutes(app)
return app
}
@@ -133,7 +133,6 @@ func runInApiServerMode(namespace string) *gin.Engine {
logger.Log.Fatalf("Error loading config file %v", err)
}
app.ConfigureBasenineServer(shared.BasenineHost, shared.BaseninePort, config.Config.MaxDBSizeBytes, config.Config.LogLevel, config.Config.InsertionFilter)
startTime = time.Now().UnixNano() / int64(time.Millisecond)
api.StartResolving(namespace)
enableExpFeatureIfNeeded()

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
@@ -59,13 +60,13 @@ func init() {
connectedWebsockets = make(map[int]*SocketConnection)
}
func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers, startTime int64) {
func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers) {
SocketGetBrowserHandler = func(c *gin.Context) {
websocketHandler(c.Writer, c.Request, eventHandlers, false, startTime)
websocketHandler(c.Writer, c.Request, eventHandlers, false)
}
SocketGetTapperHandler = func(c *gin.Context) {
websocketHandler(c.Writer, c.Request, eventHandlers, true, startTime)
websocketHandler(c.Writer, c.Request, eventHandlers, true)
}
app.GET("/ws", func(c *gin.Context) {
@@ -77,7 +78,7 @@ func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers, startTime int
})
}
func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool, startTime int64) {
func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool) {
ws, err := websocketUpgrader.Upgrade(w, r, nil)
if err != nil {
logger.Log.Errorf("Failed to set websocket upgrade: %v", err)
@@ -99,7 +100,9 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
if !isTapper {
connection, err = basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
panic(err)
logger.Log.Errorf("Failed to establish a connection to Basenine: %v", err)
socketCleanup(socketId, connectedWebsockets[socketId])
return
}
}
@@ -115,7 +118,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
eventHandlers.WebSocketConnect(socketId, isTapper)
startTimeBytes, _ := models.CreateWebsocketStartTimeMessage(startTime)
startTimeBytes, _ := models.CreateWebsocketStartTimeMessage(utils.StartTime)
if err = SendToSocket(socketId, startTimeBytes); err != nil {
logger.Log.Error(err)
@@ -137,7 +140,8 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
if !isTapper && !isQuerySet {
if err := json.Unmarshal(msg, &params); err != nil {
logger.Log.Errorf("Error: %v", socketId, err)
logger.Log.Errorf("Error unmarshalling parameters: %v", socketId, err)
continue
}
query := params.Query
@@ -166,6 +170,10 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
var entry *tapApi.Entry
err = json.Unmarshal(bytes, &entry)
if err != nil {
logger.Log.Debugf("Error unmarshalling entry: %v", err.Error())
continue
}
var message []byte
if params.EnableFullEntries {
@@ -193,7 +201,8 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
var metadata *basenine.Metadata
err = json.Unmarshal(bytes, &metadata)
if err != nil {
logger.Log.Debugf("Error recieving metadata: %v", err.Error())
logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error())
continue
}
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)

View File

@@ -9,7 +9,7 @@ import (
"github.com/op/go-logging"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/api"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
@@ -59,7 +59,6 @@ func LoadExtensions() {
return Extensions[i].Protocol.Priority < Extensions[j].Protocol.Priority
})
controllers.InitExtensionsMap(ExtensionsMap)
api.InitExtensionsMap(ExtensionsMap)
}
@@ -92,6 +91,8 @@ func ConfigureBasenineServer(host string, port string, dbSize int64, logLevel lo
if err := basenine.InsertionFilter(host, port, insertionFilter); err != nil {
logger.Log.Errorf("Error while setting the insertion filter: %v", err)
}
utils.StartTime = time.Now().UnixNano() / int64(time.Millisecond)
}
func GetEntryInputChannel() chan *tapApi.OutputChannelItem {

View File

@@ -0,0 +1,28 @@
package controllers
import (
"net/http"
"github.com/gin-gonic/gin"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/app"
"github.com/up9inc/mizu/agent/pkg/config"
"github.com/up9inc/mizu/shared"
)
func Flush(c *gin.Context) {
if err := basenine.Flush(shared.BasenineHost, shared.BaseninePort); err != nil {
c.JSON(http.StatusBadRequest, err)
} else {
c.JSON(http.StatusOK, "Flushed.")
}
}
func Reset(c *gin.Context) {
if err := basenine.Reset(shared.BasenineHost, shared.BaseninePort); err != nil {
c.JSON(http.StatusBadRequest, err)
} else {
app.ConfigureBasenineServer(shared.BasenineHost, shared.BaseninePort, config.Config.MaxDBSizeBytes, config.Config.LogLevel, config.Config.InsertionFilter)
c.JSON(http.StatusOK, "Resetted.")
}
}

View File

@@ -6,6 +6,7 @@ import (
"strconv"
"time"
"github.com/up9inc/mizu/agent/pkg/app"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/validation"
@@ -18,12 +19,6 @@ import (
tapApi "github.com/up9inc/mizu/tap/api"
)
var extensionsMap map[string]*tapApi.Extension // global
func InitExtensionsMap(ref map[string]*tapApi.Extension) {
extensionsMap = ref
}
func Error(c *gin.Context, err error) bool {
if err != nil {
logger.Log.Errorf("Error getting entry: %v", err)
@@ -77,7 +72,7 @@ func GetEntries(c *gin.Context) {
return // exit
}
extension := extensionsMap[entry.Protocol.Name]
extension := app.ExtensionsMap[entry.Protocol.Name]
base := extension.Dissector.Summarize(entry)
dataSlice = append(dataSlice, base)
@@ -123,9 +118,19 @@ func GetEntry(c *gin.Context) {
return // exit
}
extension := extensionsMap[entry.Protocol.Name]
extension := app.ExtensionsMap[entry.Protocol.Name]
base := extension.Dissector.Summarize(entry)
representation, bodySize, _ := extension.Dissector.Represent(entry.Request, entry.Response)
var representation []byte
representation, err = extension.Dissector.Represent(entry.Request, entry.Response)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{
"error": true,
"type": "error",
"autoClose": "5000",
"msg": err.Error(),
})
return // exit
}
var rules []map[string]interface{}
var isRulesEnabled bool
@@ -142,7 +147,6 @@ func GetEntry(c *gin.Context) {
c.JSON(http.StatusOK, tapApi.EntryWrapper{
Protocol: entry.Protocol,
Representation: string(representation),
BodySize: bodySize,
Data: entry,
Base: base,
Rules: rules,

View File

@@ -0,0 +1,15 @@
package routes
import (
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/gin-gonic/gin"
)
// DdRoutes defines the group of database routes.
func DbRoutes(app *gin.Engine) {
routeGroup := app.Group("/db")
routeGroup.GET("/flush", controllers.Flush)
routeGroup.GET("/reset", controllers.Reset)
}

View File

@@ -17,6 +17,10 @@ import (
"github.com/up9inc/mizu/shared/logger"
)
var (
StartTime int64 // global
)
// StartServer starts the server with a graceful shutdown
func StartServer(app *gin.Engine) {
signals := make(chan os.Signal, 2)

View File

@@ -27,4 +27,6 @@ func init() {
}
checkCmd.Flags().Bool(configStructs.PreTapCheckName, defaultCheckConfig.PreTap, "Check pre-tap Mizu installation for potential problems")
checkCmd.Flags().Bool(configStructs.PreInstallCheckName, defaultCheckConfig.PreInstall, "Check pre-install Mizu installation for potential problems")
checkCmd.Flags().Bool(configStructs.ImagePullCheckName, defaultCheckConfig.ImagePull, "Test connectivity to container image registry by creating and removing a temporary pod in 'default' namespace")
}

View File

@@ -0,0 +1,102 @@
package check
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"regexp"
"time"
)
func ImagePullInCluster(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nimage-pull-in-cluster\n--------------------")
namespace := "default"
podName := "mizu-test"
defer func() {
if err := kubernetesProvider.RemovePod(ctx, namespace, podName); err != nil {
logger.Log.Errorf("%v error while removing test pod in cluster, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
}
}()
if err := createImagePullInClusterPod(ctx, kubernetesProvider, namespace, podName); err != nil {
logger.Log.Errorf("%v error while creating test pod in cluster, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
if err := checkImagePulled(ctx, kubernetesProvider, namespace, podName); err != nil {
logger.Log.Errorf("%v cluster is not able to pull mizu containers from docker hub, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
logger.Log.Infof("%v cluster is able to pull mizu containers from docker hub", fmt.Sprintf(uiUtils.Green, "√"))
return true
}
func checkImagePulled(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespace string, podName string) error {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", podName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{namespace}, podWatchHelper)
timeAfter := time.After(30 * time.Second)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
return err
}
if pod.Status.Phase == core.PodRunning {
return nil
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
return err
case <-timeAfter:
return fmt.Errorf("image not pulled in time")
}
}
}
func createImagePullInClusterPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespace string, podName string) error {
var zero int64
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: core.PodSpec{
Containers: []core.Container{
{
Name: "probe",
Image: "up9inc/busybox",
ImagePullPolicy: "Always",
Command: []string{"cat"},
Stdin: true,
},
},
TerminationGracePeriodSeconds: &zero,
},
}
if _, err := kubernetesProvider.CreatePod(ctx, namespace, pod); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,31 @@
package check
import (
"fmt"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
)
func KubernetesApi() (*kubernetes.Provider, *semver.SemVersion, bool) {
logger.Log.Infof("\nkubernetes-api\n--------------------")
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath(), config.Config.KubeContext)
if err != nil {
logger.Log.Errorf("%v can't initialize the client, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return nil, nil, false
}
logger.Log.Infof("%v can initialize the client", fmt.Sprintf(uiUtils.Green, "√"))
kubernetesVersion, err := kubernetesProvider.GetKubernetesVersion()
if err != nil {
logger.Log.Errorf("%v can't query the Kubernetes API, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return nil, nil, false
}
logger.Log.Infof("%v can query the Kubernetes API", fmt.Sprintf(uiUtils.Green, "√"))
return kubernetesProvider, kubernetesVersion, true
}

View File

@@ -0,0 +1,131 @@
package check
import (
"context"
"embed"
"fmt"
"github.com/up9inc/mizu/cli/bucket"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
rbac "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"strings"
)
func TapKubernetesPermissions(ctx context.Context, embedFS embed.FS, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nkubernetes-permissions\n--------------------")
var filePath string
if config.Config.IsNsRestrictedMode() {
filePath = "permissionFiles/permissions-ns-tap.yaml"
} else {
filePath = "permissionFiles/permissions-all-namespaces-tap.yaml"
}
data, err := embedFS.ReadFile(filePath)
if err != nil {
logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode(data, nil, nil)
if err != nil {
logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
switch resource := obj.(type) {
case *rbac.Role:
return checkRulesPermissions(ctx, kubernetesProvider, resource.Rules, config.Config.MizuResourcesNamespace)
case *rbac.ClusterRole:
return checkRulesPermissions(ctx, kubernetesProvider, resource.Rules, "")
}
logger.Log.Errorf("%v error while checking kubernetes permissions, err: resource of type 'Role' or 'ClusterRole' not found in permission files", fmt.Sprintf(uiUtils.Red, "✗"))
return false
}
func InstallKubernetesPermissions(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nkubernetes-permissions\n--------------------")
bucketProvider := bucket.NewProvider(config.Config.Install.TemplateUrl, bucket.DefaultTimeout)
installTemplate, err := bucketProvider.GetInstallTemplate(config.Config.Install.TemplateName)
if err != nil {
logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
resourcesTemplate := strings.Split(installTemplate, "---")[1:]
permissionsExist := true
decode := scheme.Codecs.UniversalDeserializer().Decode
for _, resourceTemplate := range resourcesTemplate {
obj, _, err := decode([]byte(resourceTemplate), nil, nil)
if err != nil {
logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
groupVersionKind := obj.GetObjectKind().GroupVersionKind()
resource := fmt.Sprintf("%vs", strings.ToLower(groupVersionKind.Kind))
permissionsExist = checkCreatePermission(ctx, kubernetesProvider, resource, groupVersionKind.Group, obj.(metav1.Object).GetNamespace()) && permissionsExist
switch resourceObj := obj.(type) {
case *rbac.Role:
permissionsExist = checkRulesPermissions(ctx, kubernetesProvider, resourceObj.Rules, resourceObj.Namespace) && permissionsExist
case *rbac.ClusterRole:
permissionsExist = checkRulesPermissions(ctx, kubernetesProvider, resourceObj.Rules, "") && permissionsExist
}
}
return permissionsExist
}
func checkCreatePermission(ctx context.Context, kubernetesProvider *kubernetes.Provider, resource string, group string, namespace string) bool {
exist, err := kubernetesProvider.CanI(ctx, namespace, resource, "create", group)
return checkPermissionExist(group, resource, "create", namespace, exist, err)
}
func checkRulesPermissions(ctx context.Context, kubernetesProvider *kubernetes.Provider, rules []rbac.PolicyRule, namespace string) bool {
permissionsExist := true
for _, rule := range rules {
for _, group := range rule.APIGroups {
for _, resource := range rule.Resources {
for _, verb := range rule.Verbs {
exist, err := kubernetesProvider.CanI(ctx, namespace, resource, verb, group)
permissionsExist = checkPermissionExist(group, resource, verb, namespace, exist, err) && permissionsExist
}
}
}
}
return permissionsExist
}
func checkPermissionExist(group string, resource string, verb string, namespace string, exist bool, err error) bool {
var groupAndNamespace string
if group != "" && namespace != "" {
groupAndNamespace = fmt.Sprintf("in group '%v' and namespace '%v'", group, namespace)
} else if group != "" {
groupAndNamespace = fmt.Sprintf("in group '%v'", group)
} else if namespace != "" {
groupAndNamespace = fmt.Sprintf("in namespace '%v'", namespace)
}
if err != nil {
logger.Log.Errorf("%v error checking permission for %v %v %v, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), verb, resource, groupAndNamespace, err)
return false
} else if !exist {
logger.Log.Errorf("%v can't %v %v %v", fmt.Sprintf(uiUtils.Red, "✗"), verb, resource, groupAndNamespace)
return false
}
logger.Log.Infof("%v can %v %v %v", fmt.Sprintf(uiUtils.Green, "√"), verb, resource, groupAndNamespace)
return true
}

View File

@@ -0,0 +1,95 @@
package check
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
)
func KubernetesResources(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nk8s-components\n--------------------")
exist, err := kubernetesProvider.DoesNamespaceExist(ctx, config.Config.MizuResourcesNamespace)
allResourcesExist := checkResourceExist(config.Config.MizuResourcesNamespace, "namespace", exist, err)
exist, err = kubernetesProvider.DoesConfigMapExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ConfigMapName)
allResourcesExist = checkResourceExist(kubernetes.ConfigMapName, "config map", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesServiceAccountExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName)
allResourcesExist = checkResourceExist(kubernetes.ServiceAccountName, "service account", exist, err) && allResourcesExist
if config.Config.IsNsRestrictedMode() {
exist, err = kubernetesProvider.DoesRoleExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleName)
allResourcesExist = checkResourceExist(kubernetes.RoleName, "role", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesRoleBindingExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleBindingName)
allResourcesExist = checkResourceExist(kubernetes.RoleBindingName, "role binding", exist, err) && allResourcesExist
} else {
exist, err = kubernetesProvider.DoesClusterRoleExist(ctx, kubernetes.ClusterRoleName)
allResourcesExist = checkResourceExist(kubernetes.ClusterRoleName, "cluster role", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesClusterRoleBindingExist(ctx, kubernetes.ClusterRoleBindingName)
allResourcesExist = checkResourceExist(kubernetes.ClusterRoleBindingName, "cluster role binding", exist, err) && allResourcesExist
}
exist, err = kubernetesProvider.DoesServiceExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName)
allResourcesExist = checkResourceExist(kubernetes.ApiServerPodName, "service", exist, err) && allResourcesExist
allResourcesExist = checkPodResourcesExist(ctx, kubernetesProvider) && allResourcesExist
return allResourcesExist
}
func checkPodResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
logger.Log.Errorf("%v error checking if '%v' pod is running, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName, err)
return false
} else if len(pods) == 0 {
logger.Log.Errorf("%v '%v' pod doesn't exist", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName)
return false
} else if !kubernetes.IsPodRunning(&pods[0]) {
logger.Log.Errorf("%v '%v' pod not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName)
return false
}
logger.Log.Infof("%v '%v' pod running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.ApiServerPodName)
if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.MizuResourcesNamespace, kubernetes.TapperPodName); err != nil {
logger.Log.Errorf("%v error checking if '%v' pods are running, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, err)
return false
} else {
tappers := 0
notRunningTappers := 0
for _, pod := range pods {
tappers += 1
if !kubernetes.IsPodRunning(&pod) {
notRunningTappers += 1
}
}
if notRunningTappers > 0 {
logger.Log.Errorf("%v '%v' %v/%v pods are not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, notRunningTappers, tappers)
return false
}
logger.Log.Infof("%v '%v' %v pods running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.TapperPodName, tappers)
return true
}
}
func checkResourceExist(resourceName string, resourceType string, exist bool, err error) bool {
if err != nil {
logger.Log.Errorf("%v error checking if '%v' %v exists, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), resourceName, resourceType, err)
return false
} else if !exist {
logger.Log.Errorf("%v '%v' %v doesn't exist", fmt.Sprintf(uiUtils.Red, "✗"), resourceName, resourceType)
return false
}
logger.Log.Infof("%v '%v' %v exists", fmt.Sprintf(uiUtils.Green, "√"), resourceName, resourceType)
return true
}

View File

@@ -0,0 +1,21 @@
package check
import (
"fmt"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
)
func KubernetesVersion(kubernetesVersion *semver.SemVersion) bool {
logger.Log.Infof("\nkubernetes-version\n--------------------")
if err := kubernetes.ValidateKubernetesVersion(kubernetesVersion); err != nil {
logger.Log.Errorf("%v not running the minimum Kubernetes API version, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
logger.Log.Infof("%v is running the minimum Kubernetes API version", fmt.Sprintf(uiUtils.Green, "√"))
return true
}

View File

@@ -0,0 +1,83 @@
package check
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"regexp"
)
func ServerConnection(kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nAPI-server-connectivity\n--------------------")
serverUrl := fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort))
apiServerProvider := apiserver.NewProvider(serverUrl, 1, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err == nil {
logger.Log.Infof("%v found Mizu server tunnel available and connected successfully to API server", fmt.Sprintf(uiUtils.Green, "√"))
return true
}
connectedToApiServer := false
if err := checkProxy(serverUrl, kubernetesProvider); err != nil {
logger.Log.Errorf("%v couldn't connect to API server using proxy, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
} else {
connectedToApiServer = true
logger.Log.Infof("%v connected successfully to API server using proxy", fmt.Sprintf(uiUtils.Green, "√"))
}
if err := checkPortForward(serverUrl, kubernetesProvider); err != nil {
logger.Log.Errorf("%v couldn't connect to API server using port-forward, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
} else {
connectedToApiServer = true
logger.Log.Infof("%v connected successfully to API server using port-forward", fmt.Sprintf(uiUtils.Green, "√"))
}
return connectedToApiServer
}
func checkProxy(serverUrl string, kubernetesProvider *kubernetes.Provider) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel)
if err != nil {
return err
}
apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err != nil {
return err
}
if err := httpServer.Shutdown(ctx); err != nil {
logger.Log.Debugf("Error occurred while stopping proxy, err: %v", err)
}
return nil
}
func checkPortForward(serverUrl string, kubernetesProvider *kubernetes.Provider) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName)
forwarder, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, config.Config.Tap.GuiPort, ctx, cancel)
if err != nil {
return err
}
apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err != nil {
return err
}
forwarder.Close()
return nil
}

View File

@@ -4,20 +4,10 @@ import (
"context"
"embed"
"fmt"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"regexp"
"time"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/cmd/check"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
)
var (
@@ -31,27 +21,33 @@ func runMizuCheck() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
kubernetesProvider, kubernetesVersion, checkPassed := checkKubernetesApi()
kubernetesProvider, kubernetesVersion, checkPassed := check.KubernetesApi()
if checkPassed {
checkPassed = checkKubernetesVersion(kubernetesVersion)
checkPassed = check.KubernetesVersion(kubernetesVersion)
}
if config.Config.Check.PreTap {
if checkPassed {
checkPassed = checkK8sTapPermissions(ctx, kubernetesProvider)
checkPassed = check.TapKubernetesPermissions(ctx, embedFS, kubernetesProvider)
}
} else if config.Config.Check.PreInstall {
if checkPassed {
checkPassed = checkImagePullInCluster(ctx, kubernetesProvider)
checkPassed = check.InstallKubernetesPermissions(ctx, kubernetesProvider)
}
} else {
if checkPassed {
checkPassed = checkK8sResources(ctx, kubernetesProvider)
checkPassed = check.KubernetesResources(ctx, kubernetesProvider)
}
if checkPassed {
checkPassed = checkServerConnection(kubernetesProvider)
checkPassed = check.ServerConnection(kubernetesProvider)
}
}
if config.Config.Check.ImagePull {
if checkPassed {
checkPassed = check.ImagePullInCluster(ctx, kubernetesProvider)
}
}
@@ -61,365 +57,3 @@ func runMizuCheck() {
logger.Log.Errorf("\nStatus check results are %v", fmt.Sprintf(uiUtils.Red, "✗"))
}
}
func checkKubernetesApi() (*kubernetes.Provider, *semver.SemVersion, bool) {
logger.Log.Infof("\nkubernetes-api\n--------------------")
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath(), config.Config.KubeContext)
if err != nil {
logger.Log.Errorf("%v can't initialize the client, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return nil, nil, false
}
logger.Log.Infof("%v can initialize the client", fmt.Sprintf(uiUtils.Green, "√"))
kubernetesVersion, err := kubernetesProvider.GetKubernetesVersion()
if err != nil {
logger.Log.Errorf("%v can't query the Kubernetes API, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return nil, nil, false
}
logger.Log.Infof("%v can query the Kubernetes API", fmt.Sprintf(uiUtils.Green, "√"))
return kubernetesProvider, kubernetesVersion, true
}
func checkKubernetesVersion(kubernetesVersion *semver.SemVersion) bool {
logger.Log.Infof("\nkubernetes-version\n--------------------")
if err := kubernetes.ValidateKubernetesVersion(kubernetesVersion); err != nil {
logger.Log.Errorf("%v not running the minimum Kubernetes API version, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
logger.Log.Infof("%v is running the minimum Kubernetes API version", fmt.Sprintf(uiUtils.Green, "√"))
return true
}
func checkServerConnection(kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nAPI-server-connectivity\n--------------------")
serverUrl := GetApiServerUrl(config.Config.Tap.GuiPort)
apiServerProvider := apiserver.NewProvider(serverUrl, 1, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err == nil {
logger.Log.Infof("%v found Mizu server tunnel available and connected successfully to API server", fmt.Sprintf(uiUtils.Green, "√"))
return true
}
connectedToApiServer := false
if err := checkProxy(serverUrl, kubernetesProvider); err != nil {
logger.Log.Errorf("%v couldn't connect to API server using proxy, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
} else {
connectedToApiServer = true
logger.Log.Infof("%v connected successfully to API server using proxy", fmt.Sprintf(uiUtils.Green, "√"))
}
if err := checkPortForward(serverUrl, kubernetesProvider); err != nil {
logger.Log.Errorf("%v couldn't connect to API server using port-forward, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
} else {
connectedToApiServer = true
logger.Log.Infof("%v connected successfully to API server using port-forward", fmt.Sprintf(uiUtils.Green, "√"))
}
return connectedToApiServer
}
func checkProxy(serverUrl string, kubernetesProvider *kubernetes.Provider) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel)
if err != nil {
return err
}
apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err != nil {
return err
}
if err := httpServer.Shutdown(ctx); err != nil {
logger.Log.Debugf("Error occurred while stopping proxy, err: %v", err)
}
return nil
}
func checkPortForward(serverUrl string, kubernetesProvider *kubernetes.Provider) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName)
forwarder, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, config.Config.Tap.GuiPort, ctx, cancel)
if err != nil {
return err
}
apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err != nil {
return err
}
forwarder.Close()
return nil
}
func checkK8sResources(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nk8s-components\n--------------------")
exist, err := kubernetesProvider.DoesNamespaceExist(ctx, config.Config.MizuResourcesNamespace)
allResourcesExist := checkResourceExist(config.Config.MizuResourcesNamespace, "namespace", exist, err)
exist, err = kubernetesProvider.DoesConfigMapExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ConfigMapName)
allResourcesExist = checkResourceExist(kubernetes.ConfigMapName, "config map", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesServiceAccountExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName)
allResourcesExist = checkResourceExist(kubernetes.ServiceAccountName, "service account", exist, err) && allResourcesExist
if config.Config.IsNsRestrictedMode() {
exist, err = kubernetesProvider.DoesRoleExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleName)
allResourcesExist = checkResourceExist(kubernetes.RoleName, "role", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesRoleBindingExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleBindingName)
allResourcesExist = checkResourceExist(kubernetes.RoleBindingName, "role binding", exist, err) && allResourcesExist
} else {
exist, err = kubernetesProvider.DoesClusterRoleExist(ctx, kubernetes.ClusterRoleName)
allResourcesExist = checkResourceExist(kubernetes.ClusterRoleName, "cluster role", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesClusterRoleBindingExist(ctx, kubernetes.ClusterRoleBindingName)
allResourcesExist = checkResourceExist(kubernetes.ClusterRoleBindingName, "cluster role binding", exist, err) && allResourcesExist
}
exist, err = kubernetesProvider.DoesServiceExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName)
allResourcesExist = checkResourceExist(kubernetes.ApiServerPodName, "service", exist, err) && allResourcesExist
allResourcesExist = checkPodResourcesExist(ctx, kubernetesProvider) && allResourcesExist
return allResourcesExist
}
func checkPodResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
logger.Log.Errorf("%v error checking if '%v' pod is running, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName, err)
return false
} else if len(pods) == 0 {
logger.Log.Errorf("%v '%v' pod doesn't exist", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName)
return false
} else if !kubernetes.IsPodRunning(&pods[0]) {
logger.Log.Errorf("%v '%v' pod not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName)
return false
}
logger.Log.Infof("%v '%v' pod running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.ApiServerPodName)
if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.MizuResourcesNamespace, kubernetes.TapperPodName); err != nil {
logger.Log.Errorf("%v error checking if '%v' pods are running, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, err)
return false
} else {
tappers := 0
notRunningTappers := 0
for _, pod := range pods {
tappers += 1
if !kubernetes.IsPodRunning(&pod) {
notRunningTappers += 1
}
}
if notRunningTappers > 0 {
logger.Log.Errorf("%v '%v' %v/%v pods are not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, notRunningTappers, tappers)
return false
}
logger.Log.Infof("%v '%v' %v pods running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.TapperPodName, tappers)
return true
}
}
func checkResourceExist(resourceName string, resourceType string, exist bool, err error) bool {
if err != nil {
logger.Log.Errorf("%v error checking if '%v' %v exists, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), resourceName, resourceType, err)
return false
} else if !exist {
logger.Log.Errorf("%v '%v' %v doesn't exist", fmt.Sprintf(uiUtils.Red, "✗"), resourceName, resourceType)
return false
}
logger.Log.Infof("%v '%v' %v exists", fmt.Sprintf(uiUtils.Green, "√"), resourceName, resourceType)
return true
}
func checkK8sTapPermissions(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nkubernetes-permissions\n--------------------")
var filePath string
if config.Config.IsNsRestrictedMode() {
filePath = "permissionFiles/permissions-ns-tap.yaml"
} else {
filePath = "permissionFiles/permissions-all-namespaces-tap.yaml"
}
data, err := embedFS.ReadFile(filePath)
if err != nil {
logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
obj, err := getDecodedObject(data)
if err != nil {
logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
var rules []rbac.PolicyRule
if config.Config.IsNsRestrictedMode() {
rules = obj.(*rbac.Role).Rules
} else {
rules = obj.(*rbac.ClusterRole).Rules
}
return checkPermissions(ctx, kubernetesProvider, rules)
}
func getDecodedObject(data []byte) (runtime.Object, error) {
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode(data, nil, nil)
if err != nil {
return nil, err
}
return obj, nil
}
func checkPermissions(ctx context.Context, kubernetesProvider *kubernetes.Provider, rules []rbac.PolicyRule) bool {
permissionsExist := true
for _, rule := range rules {
for _, group := range rule.APIGroups {
for _, resource := range rule.Resources {
for _, verb := range rule.Verbs {
exist, err := kubernetesProvider.CanI(ctx, config.Config.MizuResourcesNamespace, resource, verb, group)
permissionsExist = checkPermissionExist(group, resource, verb, exist, err) && permissionsExist
}
}
}
}
return permissionsExist
}
func checkPermissionExist(group string, resource string, verb string, exist bool, err error) bool {
if err != nil {
logger.Log.Errorf("%v error checking permission for %v %v in group '%v', err: %v", fmt.Sprintf(uiUtils.Red, "✗"), verb, resource, group, err)
return false
} else if !exist {
logger.Log.Errorf("%v can't %v %v in group '%v'", fmt.Sprintf(uiUtils.Red, "✗"), verb, resource, group)
return false
}
logger.Log.Infof("%v can %v %v in group '%v'", fmt.Sprintf(uiUtils.Green, "√"), verb, resource, group)
return true
}
func checkImagePullInCluster(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nimage-pull-in-cluster\n--------------------")
podName := "image-pull-in-cluster"
defer removeImagePullInClusterResources(ctx, kubernetesProvider, podName)
if err := createImagePullInClusterResources(ctx, kubernetesProvider, podName); err != nil {
logger.Log.Errorf("%v error while creating image pull in cluster resources, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
if err := checkImagePulled(ctx, kubernetesProvider, podName); err != nil {
logger.Log.Errorf("%v cluster is not able to pull mizu containers from docker hub, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
logger.Log.Infof("%v cluster is able to pull mizu containers from docker hub", fmt.Sprintf(uiUtils.Green, "√"))
return true
}
func checkImagePulled(ctx context.Context, kubernetesProvider *kubernetes.Provider, podName string) error {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", podName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
timeAfter := time.After(30 * time.Second)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
return err
}
if pod.Status.Phase == core.PodRunning {
return nil
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
return err
case <-timeAfter:
return fmt.Errorf("image not pulled in time")
}
}
}
func removeImagePullInClusterResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, podName string) {
if err := kubernetesProvider.RemovePod(ctx, config.Config.MizuResourcesNamespace, podName); err != nil {
logger.Log.Debugf("error while removing image pull in cluster resources, err: %v", err)
}
if !config.Config.IsNsRestrictedMode() {
if err := kubernetesProvider.RemoveNamespace(ctx, config.Config.MizuResourcesNamespace); err != nil {
logger.Log.Debugf("error while removing image pull in cluster resources, err: %v", err)
}
}
}
func createImagePullInClusterResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, podName string) error {
if !config.Config.IsNsRestrictedMode() {
if _, err := kubernetesProvider.CreateNamespace(ctx, config.Config.MizuResourcesNamespace); err != nil {
return err
}
}
var zero int64
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: core.PodSpec{
Containers: []core.Container{
{
Name: "probe",
Image: "up9inc/busybox",
ImagePullPolicy: "Always",
Command: []string{"cat"},
Stdin: true,
},
},
TerminationGracePeriodSeconds: &zero,
},
}
if _, err := kubernetesProvider.CreatePod(ctx, config.Config.MizuResourcesNamespace, pod); err != nil {
return err
}
return nil
}

View File

@@ -1,9 +1,13 @@
package configStructs
const (
PreTapCheckName = "pre-tap"
PreTapCheckName = "pre-tap"
PreInstallCheckName = "pre-install"
ImagePullCheckName = "image-pull"
)
type CheckConfig struct {
PreTap bool `yaml:"pre-tap"`
PreTap bool `yaml:"pre-tap"`
PreInstall bool `yaml:"pre-install"`
ImagePull bool `yaml:"image-pull"`
}

View File

@@ -11,7 +11,7 @@ require (
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/spf13/cobra v1.3.0
github.com/spf13/pflag v1.0.5
github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e
github.com/up9inc/basenine/server/lib v0.0.0-20220317230530-8472d80307f6
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8

View File

@@ -600,8 +600,8 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e h1:reG/QwyxdfvGObfdrae7DZc3rTMiGwQ6S/4PRkwtBoE=
github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e/go.mod h1:ZIkxWiJm65jYQIso9k+OZKhR7gQ1we2jNyE2kQX9IQI=
github.com/up9inc/basenine/server/lib v0.0.0-20220317230530-8472d80307f6 h1:+RZTD+HdfIW2SMbc65yWkruTY+g5/1Av074m62A74ls=
github.com/up9inc/basenine/server/lib v0.0.0-20220317230530-8472d80307f6/go.mod h1:ZIkxWiJm65jYQIso9k+OZKhR7gQ1we2jNyE2kQX9IQI=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk=

View File

@@ -94,10 +94,6 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperPods() {
continue
}
if tapperSyncer.startTime.After(pod.CreationTimestamp.Time) {
continue
}
logger.Log.Debugf("Watching tapper pods loop, tapper: %v, node: %v, status: %v", pod.Name, pod.Spec.NodeName, pod.Status.Phase)
if pod.Spec.NodeName != "" {
tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)}
@@ -137,10 +133,6 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() {
continue
}
if tapperSyncer.startTime.After(event.CreationTimestamp.Time) {
continue
}
logger.Log.Debugf(
fmt.Sprintf("Watching tapper events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s",
event.Name,

View File

@@ -19,7 +19,7 @@ import (
const mizuTestEnvVar = "MIZU_TEST"
var UnknownIp net.IP = net.IP{0, 0, 0, 0}
var UnknownIp net.IP = net.IP{0, 0, 0, 0}
var UnknownPort uint16 = 0
type Protocol struct {
@@ -83,6 +83,7 @@ type CounterPair struct {
type GenericMessage struct {
IsRequest bool `json:"isRequest"`
CaptureTime time.Time `json:"captureTime"`
CaptureSize int `json:"captureSize"`
Payload interface{} `json:"payload"`
}
@@ -110,13 +111,27 @@ type SuperIdentifier struct {
IsClosedOthers bool
}
type ReadProgress struct {
readBytes int
lastCurrent int
}
func (p *ReadProgress) Feed(n int) {
p.readBytes += n
}
func (p *ReadProgress) Current() (n int) {
p.lastCurrent = p.readBytes - p.lastCurrent
return p.lastCurrent
}
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, capture Capture, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Dissect(b *bufio.Reader, progress *ReadProgress, capture Capture, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Summarize(entry *Entry) *BaseEntry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error)
Macros() map[string]string
NewResponseRequestMatcher() RequestResponseMatcher
}
@@ -152,6 +167,8 @@ type Entry struct {
StartTime time.Time `json:"startTime"`
Request map[string]interface{} `json:"request"`
Response map[string]interface{} `json:"response"`
RequestSize int `json:"requestSize"`
ResponseSize int `json:"responseSize"`
ElapsedTime int64 `json:"elapsedTime"`
Rules ApplicableRules `json:"rules,omitempty"`
ContractStatus ContractStatus `json:"contractStatus,omitempty"`
@@ -164,7 +181,6 @@ type Entry struct {
type EntryWrapper struct {
Protocol Protocol `json:"protocol"`
Representation string `json:"representation"`
BodySize int64 `json:"bodySize"`
Data *Entry `json:"data"`
Base *BaseEntry `json:"base"`
Rules []map[string]interface{} `json:"rulesMatched,omitempty"`

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/amqp/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect5/amqp/\* expect

View File

@@ -94,7 +94,7 @@ type AMQPWrapper struct {
Details interface{} `json:"details"`
}
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter, capture api.Capture) {
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, captureSize int, emitter api.Emitter, capture api.Capture) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,

View File

@@ -39,7 +39,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
r := AmqpReader{b}
var remaining int
@@ -113,11 +113,11 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
case *BasicPublish:
eventBasicPublish.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter, capture)
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter, capture)
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
}
case *MethodFrame:
@@ -138,7 +138,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter, capture)
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
case *BasicConsume:
eventBasicConsume := &BasicConsume{
@@ -151,7 +151,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter, capture)
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag
@@ -171,7 +171,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture)
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{
@@ -185,7 +185,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture)
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
case *ConnectionStart:
eventConnectionStart := &ConnectionStart{
@@ -196,7 +196,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
Locales: m.Locales,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture)
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
case *ConnectionClose:
eventConnectionClose := &ConnectionClose{
@@ -206,7 +206,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
MethodId: m.MethodId,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter, capture)
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
}
default:
@@ -236,6 +236,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
RequestSize: item.Pair.Request.CaptureSize,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: 0,
@@ -301,8 +302,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
}
}
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
representation := make(map[string]interface{})
var repRequest []interface{}
switch request["method"].(string) {

View File

@@ -122,7 +122,7 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -140,7 +140,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -319,7 +319,7 @@ func TestRepresent(t *testing.T) {
var objects []string
for _, entry := range entries {
object, _, err := dissector.Represent(entry.Request, entry.Response)
object, err := dissector.Represent(entry.Request, entry.Response)
assert.Nil(t, err)
objects = append(objects, string(object))
}

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/http/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect5/http/\* expect

View File

@@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
item.ConnectionInfo.ClientPort = ""
}
func handleHTTP2Stream(http2Assembler *Http2Assembler, capture api.Capture, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil {
return err
@@ -66,7 +66,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, capture api.Capture, tcpI
streamID,
"HTTP2",
)
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime, messageHTTP1.ProtoMinor)
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime, progress.Current(), messageHTTP1.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
@@ -86,7 +86,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, capture api.Capture, tcpI
streamID,
"HTTP2",
)
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime, messageHTTP1.ProtoMinor)
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime, progress.Current(), messageHTTP1.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,
@@ -111,7 +111,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, capture api.Capture, tcpI
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b)
if err != nil {
return
@@ -139,7 +139,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, capture api.Capture, tcpID *api.Tc
requestCounter,
"HTTP1",
)
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, req.ProtoMinor)
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, progress.Current(), req.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
@@ -154,7 +154,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, capture api.Capture, tcpID *api.Tc
return
}
func handleHTTP1ServerStream(b *bufio.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response
res, err = http.ReadResponse(b, nil)
if err != nil {
@@ -183,7 +183,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, capture api.Capture, tcpID *api.Tc
responseCounter,
"HTTP1",
)
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, res.ProtoMinor)
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, progress.Current(), res.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,

View File

@@ -86,7 +86,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
var err error
@@ -121,7 +121,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
}
if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, capture, tcpID, superTimer, emitter, options, reqResMatcher)
err = handleHTTP2Stream(http2Assembler, progress, capture, tcpID, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -130,7 +130,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
superIdentifier.Protocol = &http11protocol
} else if isClient {
var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, progress, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -148,7 +148,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
tcpID.DstPort,
"HTTP2",
)
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, req.ProtoMinor)
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, progress.Current(), req.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
@@ -162,7 +162,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
}
}
} else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, progress, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -271,14 +271,16 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
HTTPPair: string(httpPair),
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
RequestSize: item.Pair.Request.CaptureSize,
ResponseSize: item.Pair.Response.CaptureSize,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
HTTPPair: string(httpPair),
}
}
@@ -410,11 +412,9 @@ func representRequest(request map[string]interface{}) (repRequest []interface{})
return
}
func representResponse(response map[string]interface{}) (repResponse []interface{}, bodySize int64) {
func representResponse(response map[string]interface{}) (repResponse []interface{}) {
repResponse = make([]interface{}, 0)
bodySize = int64(response["bodySize"].(float64))
details, _ := json.Marshal([]api.TableData{
{
Name: "Status",
@@ -428,7 +428,7 @@ func representResponse(response map[string]interface{}) (repResponse []interface
},
{
Name: "Body Size (bytes)",
Value: bodySize,
Value: int64(response["bodySize"].(float64)),
Selector: `response.bodySize`,
},
})
@@ -471,10 +471,10 @@ func representResponse(response map[string]interface{}) (repResponse []interface
return
}
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
representation := make(map[string]interface{})
repRequest := representRequest(request)
repResponse, bodySize := representResponse(response)
repResponse := representResponse(response)
representation["request"] = repRequest
representation["response"] = repResponse
object, err = json.Marshal(representation)

View File

@@ -124,7 +124,7 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -142,7 +142,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -321,7 +321,7 @@ func TestRepresent(t *testing.T) {
var objects []string
for _, entry := range entries {
object, _, err := dissector.Represent(entry.Request, entry.Response)
object, err := dissector.Represent(entry.Request, entry.Response)
assert.Nil(t, err)
objects = append(objects, string(object))
}

View File

@@ -24,10 +24,11 @@ func (matcher *requestResponseMatcher) GetMap() *sync.Map {
func (matcher *requestResponseMatcher) SetMaxTry(value int) {
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, captureSize int, protoMinor int) *api.OutputChannelItem {
requestHTTPMessage := api.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,
CaptureSize: captureSize,
Payload: api.HTTPPayload{
Type: TypeHttpRequest,
Data: request,
@@ -47,10 +48,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
return nil
}
func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, captureSize int, protoMinor int) *api.OutputChannelItem {
responseHTTPMessage := api.GenericMessage{
IsRequest: false,
CaptureTime: captureTime,
CaptureSize: captureSize,
Payload: api.HTTPPayload{
Type: TypeHttpResponse,
Data: response,

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/kafka/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect5/kafka/\* expect

View File

@@ -35,7 +35,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
@@ -79,13 +79,15 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
RequestSize: item.Pair.Request.CaptureSize,
ResponseSize: item.Pair.Response.CaptureSize,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
}
}
@@ -208,8 +210,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
}
}
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
representation := make(map[string]interface{})
apiKey := ApiKey(request["apiKey"].(float64))

View File

@@ -123,7 +123,7 @@ func TestDissect(t *testing.T) {
}
reqResMatcher := dissector.NewResponseRequestMatcher()
reqResMatcher.SetMaxTry(10)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
@@ -141,7 +141,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
@@ -320,7 +320,7 @@ func TestRepresent(t *testing.T) {
var objects []string
for _, entry := range entries {
object, _, err := dissector.Represent(entry.Request, entry.Response)
object, err := dissector.Represent(entry.Request, entry.Response)
assert.Nil(t, err)
objects = append(objects, string(object))
}

View File

@@ -265,6 +265,7 @@ func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPai
Request: api.GenericMessage{
IsRequest: true,
CaptureTime: reqResPair.Request.CaptureTime,
CaptureSize: int(reqResPair.Request.Size),
Payload: KafkaPayload{
Data: &KafkaWrapper{
Method: apiNames[apiKey],
@@ -276,6 +277,7 @@ func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPai
Response: api.GenericMessage{
IsRequest: false,
CaptureTime: reqResPair.Response.CaptureTime,
CaptureSize: int(reqResPair.Response.Size),
Payload: KafkaPayload{
Data: &KafkaWrapper{
Method: apiNames[apiKey],

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/redis/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect5/redis/\* expect

View File

@@ -6,7 +6,7 @@ import (
"github.com/up9inc/mizu/tap/api"
)
func handleClientStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
func handleClientStream(progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
@@ -21,7 +21,7 @@ func handleClientStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.
requestCounter,
)
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime, progress.Current())
if item != nil {
item.Capture = capture
item.ConnectionInfo = &api.ConnectionInfo{
@@ -36,7 +36,7 @@ func handleClientStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.
return nil
}
func handleServerStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
func handleServerStream(progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
@@ -51,7 +51,7 @@ func handleServerStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.
responseCounter,
)
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime, progress.Current())
if item != nil {
item.Capture = capture
item.ConnectionInfo = &api.ConnectionInfo{

View File

@@ -34,7 +34,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
is := &RedisInputStream{
Reader: b,
@@ -48,9 +48,9 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
}
if isClient {
err = handleClientStream(capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
err = handleClientStream(progress, capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
} else {
err = handleServerStream(capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
err = handleServerStream(progress, capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
}
if err != nil {
@@ -82,13 +82,15 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
RequestSize: item.Pair.Request.CaptureSize,
ResponseSize: item.Pair.Response.CaptureSize,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
}
}
@@ -131,8 +133,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
}
}
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
representation := make(map[string]interface{})
repRequest := representGeneric(request, `request.`)
repResponse := representGeneric(response, `response.`)

View File

@@ -123,7 +123,7 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
@@ -141,7 +141,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
@@ -320,7 +320,7 @@ func TestRepresent(t *testing.T) {
var objects []string
for _, entry := range entries {
object, _, err := dissector.Represent(entry.Request, entry.Response)
object, err := dissector.Represent(entry.Request, entry.Response)
assert.Nil(t, err)
objects = append(objects, string(object))
}

View File

@@ -22,10 +22,11 @@ func (matcher *requestResponseMatcher) GetMap() *sync.Map {
func (matcher *requestResponseMatcher) SetMaxTry(value int) {
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time, captureSize int) *api.OutputChannelItem {
requestRedisMessage := api.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,
CaptureSize: captureSize,
Payload: RedisPayload{
Data: &RedisWrapper{
Method: string(request.Command),
@@ -48,10 +49,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *Re
return nil
}
func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time, captureSize int) *api.OutputChannelItem {
responseRedisMessage := api.GenericMessage{
IsRequest: false,
CaptureTime: captureTime,
CaptureSize: captureSize,
Payload: RedisPayload{
Data: &RedisWrapper{
Method: string(response.Command),

View File

@@ -40,6 +40,7 @@ type tcpReader struct {
isOutgoing bool
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte
progress *api.ReadProgress
superTimer *api.SuperTimer
parent *tcpStream
packetsSeen uint
@@ -80,6 +81,8 @@ func (h *tcpReader) Read(p []byte) (int, error) {
l := copy(p, h.data)
h.data = h.data[l:]
h.progress.Feed(l)
return l, nil
}
@@ -96,7 +99,7 @@ func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
// TODO: Add api.Pcap, api.Envoy and api.Linkerd distinction by refactoring NewPacketSourceManager method
err := h.extension.Dissector.Dissect(b, api.Pcap, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
err := h.extension.Dissector.Dissect(b, h.progress, api.Pcap, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
if err != nil {
_, err = io.Copy(ioutil.Discard, b)
if err != nil {

View File

@@ -89,6 +89,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
}
stream.clients = append(stream.clients, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
progress: &api.ReadProgress{},
superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
@@ -108,6 +109,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
})
stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
progress: &api.ReadProgress{},
superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{

View File

@@ -146,6 +146,7 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k
doneHandler: func(r *tlsReader) {
p.closeReader(key, r)
},
progress: &api.ReadProgress{},
}
tcpid := p.buildTcpId(chunk, ip, port)
@@ -158,7 +159,7 @@ func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid
emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) {
b := bufio.NewReader(reader)
err := extension.Dissector.Dissect(b, api.Ebpf, isRequest, tcpid, &api.CounterPair{},
err := extension.Dissector.Dissect(b, reader.progress, api.Ebpf, isRequest, tcpid, &api.CounterPair{},
&api.SuperTimer{}, &api.SuperIdentifier{}, emitter, options, reqResMatcher)
if err != nil {

View File

@@ -3,6 +3,8 @@ package tlstapper
import (
"io"
"time"
"github.com/up9inc/mizu/tap/api"
)
type tlsReader struct {
@@ -10,6 +12,7 @@ type tlsReader struct {
chunks chan *tlsChunk
data []byte
doneHandler func(r *tlsReader)
progress *api.ReadProgress
}
func (r *tlsReader) Read(p []byte) (int, error) {
@@ -36,6 +39,7 @@ func (r *tlsReader) Read(p []byte) (int, error) {
l := copy(p, r.data)
r.data = r.data[l:]
r.progress.Feed(l)
return l, nil
}

View File

@@ -1,6 +1,6 @@
{
"name": "@up9/mizu-common",
"version": "1.0.128",
"version": "1.0.130",
"description": "Made with create-react-library",
"author": "",
"license": "MIT",

View File

@@ -36,23 +36,36 @@ const useStyles = makeStyles(() => ({
export const formatSize = (n: number) => n > 1000 ? `${Math.round(n / 1000)}KB` : `${n} B`;
const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
const EntryTitle: React.FC<any> = ({protocol, data, elapsedTime}) => {
const classes = useStyles();
const request = data.request;
const response = data.response;
return <div className={classes.entryTitle}>
<Protocol protocol={protocol} horizontal={true}/>
<div style={{right: "30px", position: "absolute", display: "flex"}}>
{response && <Queryable
query={`response.bodySize == ${bodySize}`}
{request && <Queryable
query={`requestSize == ${data.requestSize}`}
style={{margin: "0 18px"}}
displayIconOnMouseOver={true}
>
<div
style={{opacity: 0.5}}
id="entryDetailedTitleBodySize"
id="entryDetailedTitleRequestSize"
>
{formatSize(bodySize)}
{`Request: ${formatSize(data.requestSize)}`}
</div>
</Queryable>}
{response && <Queryable
query={`responseSize == ${data.responseSize}`}
style={{margin: "0 18px"}}
displayIconOnMouseOver={true}
>
<div
style={{opacity: 0.5}}
id="entryDetailedTitleResponseSize"
>
{`Response: ${formatSize(data.responseSize)}`}
</div>
</Queryable>}
{response && <Queryable
@@ -64,7 +77,7 @@ const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
style={{opacity: 0.5}}
id="entryDetailedTitleElapsedTime"
>
{Math.round(elapsedTime)}ms
{`Elapsed Time: ${Math.round(elapsedTime)}ms`}
</div>
</Queryable>}
</div>
@@ -120,7 +133,6 @@ export const EntryDetailed = () => {
{entryData && <EntryTitle
protocol={entryData.protocol}
data={entryData.data}
bodySize={entryData.bodySize}
elapsedTime={entryData.data.elapsedTime}
/>}
{entryData && <EntrySummary entry={entryData.base}/>}

View File

@@ -4,7 +4,7 @@ import SwapHorizIcon from '@material-ui/icons/SwapHoriz';
import styles from './EntryListItem.module.sass';
import StatusCode, {getClassification, StatusCodeClassification} from "../../UI/StatusCode";
import Protocol, {ProtocolInterface} from "../../UI/Protocol"
import eBPFLogo from '../assets/ebpf.png';
import eBPFLogo from '../../assets/ebpf.png';
import {Summary} from "../../UI/Summary";
import Queryable from "../../UI/Queryable";
import ingoingIconSuccess from "assets/ingoing-traffic-success.svg"

View File

@@ -48,10 +48,13 @@ interface TrafficViewerProps {
trafficViewerApiProp: TrafficViewerApi,
actionButtons?: JSX.Element,
isShowStatusBar?: boolean,
webSocketUrl : string
webSocketUrl : string,
isCloseWebSocket : boolean
}
export const TrafficViewer : React.FC<TrafficViewerProps> = ({setAnalyzeStatus, trafficViewerApiProp, actionButtons,isShowStatusBar,webSocketUrl}) => {
export const TrafficViewer : React.FC<TrafficViewerProps> = ({setAnalyzeStatus, trafficViewerApiProp,
actionButtons,isShowStatusBar,webSocketUrl,
isCloseWebSocket}) => {
const classes = useLayoutStyles();
@@ -103,6 +106,10 @@ export const TrafficViewer : React.FC<TrafficViewerProps> = ({setAnalyzeStatus,
handleQueryChange(query);
}, [query, handleQueryChange]);
useEffect(()=>{
isCloseWebSocket && closeWebSocket()
},[isCloseWebSocket])
const ws = useRef(null);
const listEntry = useRef(null);
@@ -142,6 +149,10 @@ export const TrafficViewer : React.FC<TrafficViewerProps> = ({setAnalyzeStatus,
}, 500)
}
const closeWebSocket = () => {
ws.current && ws.current.close()
}
if (ws.current) {
ws.current.onmessage = (e) => {
if (!e?.data) return;
@@ -200,7 +211,7 @@ export const TrafficViewer : React.FC<TrafficViewerProps> = ({setAnalyzeStatus,
}
useEffect(() => {
setTrafficViewerApiState({...trafficViewerApiProp, webSocket : {close : () => ws.current.close()}});
setTrafficViewerApiState({...trafficViewerApiProp, webSocket : {close : closeWebSocket}});
(async () => {
openWebSocket("leftOff(-1)", true);
try{
@@ -330,10 +341,13 @@ export const TrafficViewer : React.FC<TrafficViewerProps> = ({setAnalyzeStatus,
};
const MemoiedTrafficViewer = React.memo(TrafficViewer)
const TrafficViewerContainer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, trafficViewerApiProp, actionButtons, isShowStatusBar = true ,webSocketUrl}) => {
const TrafficViewerContainer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, trafficViewerApiProp,
actionButtons, isShowStatusBar = true ,
webSocketUrl, isCloseWebSocket}) => {
return <RecoilRoot>
<MemoiedTrafficViewer actionButtons={actionButtons} isShowStatusBar={isShowStatusBar} webSocketUrl={webSocketUrl}
trafficViewerApiProp={trafficViewerApiProp} setAnalyzeStatus={setAnalyzeStatus} />
isCloseWebSocket={isCloseWebSocket} trafficViewerApiProp={trafficViewerApiProp}
setAnalyzeStatus={setAnalyzeStatus} />
</RecoilRoot>
}

View File

@@ -1,13 +1,13 @@
type TrafficViewerApi = {
validateQuery : (query: any) => any
tapStatus : () => any
analyzeStatus : () => any
fetchEntries : (leftOff: any, direction: number, query: any, limit: number, timeoutMs: number) => any
getEntry : (entryId : any, query:string) => any
getRecentTLSLinks : () => any,
webSocket : {
close : () => {}
}
validateQuery: (query: any) => any
tapStatus: () => any
analyzeStatus: () => any
fetchEntries: (leftOff: any, direction: number, query: any, limit: number, timeoutMs: number) => any
getEntry: (entryId: any, query: string) => any
getRecentTLSLinks: () => any,
webSocket: {
close: () => void
}
}
export default TrafficViewerApi
export default TrafficViewerApi

View File

Before

Width:  |  Height:  |  Size: 21 KiB

After

Width:  |  Height:  |  Size: 21 KiB

View File

@@ -13,7 +13,7 @@
"@types/jest": "^26.0.22",
"@types/node": "^12.20.10",
"@uiw/react-textarea-code-editor": "^1.4.12",
"@up9/mizu-common": "^1.0.128",
"@up9/mizu-common": "^1.0.130",
"axios": "^0.25.0",
"core-js": "^3.20.2",
"craco-babel-loader": "^1.0.3",

View File

@@ -1,4 +1,4 @@
import React, {useEffect} from "react";
import React, {useEffect, useState} from "react";
import { Button } from "@material-ui/core";
import Api,{getWebsocketUrl} from "../../../helpers/api";
import debounce from 'lodash/debounce';
@@ -8,8 +8,8 @@ import serviceMapModalOpenAtom from "../../../recoil/serviceMapModalOpen";
import TrafficViewer from "@up9/mizu-common"
import "@up9/mizu-common/dist/index.css"
import oasModalOpenAtom from "../../../recoil/oasModalOpen/atom";
import serviceMap from "../../assets/serviceMap.svg";
import services from "../../assets/services.svg";
import serviceMap from "../../assets/serviceMap.svg";
import services from "../../assets/services.svg";
interface TrafficPageProps {
setAnalyzeStatus?: (status: any) => void;
@@ -21,38 +21,40 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
const commonClasses = useCommonStyles();
const setServiceMapModalOpen = useSetRecoilState(serviceMapModalOpenAtom);
const [openOasModal, setOpenOasModal] = useRecoilState(oasModalOpenAtom);
const [openWebSocket, setOpenWebSocket] = useState(true);
const trafficViewerApi = {...api}
const handleOpenOasModal = () => {
//closeSocket() -- Todo: Add Close webSocket
setOpenOasModal(true);
const handleOpenOasModal = () => {
setOpenWebSocket(false)
setOpenOasModal(true);
}
const openServiceMapModalDebounce = debounce(() => {
setOpenWebSocket(false)
setServiceMapModalOpen(true)
}, 500);
const actionButtons = (window["isOasEnabled"] || window["isServiceMapEnabled"]) &&
<div style={{ display: 'flex', height: "100%" }}>
{window["isOasEnabled"] && <Button
startIcon={<img className="custom" src={services} alt="services"></img>}
size="large"
type="submit"
variant="contained"
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
style={{ marginRight: 25 }}
onClick={handleOpenOasModal}>
Show OAS
</Button>}
{window["isServiceMapEnabled"] && <Button
startIcon={<img src={serviceMap} className="custom" alt="service-map" style={{marginRight:"8%"}}></img>}
size="large"
variant="contained"
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
onClick={openServiceMapModalDebounce}>
Service Map
</Button>}
const actionButtons = (window["isOasEnabled"] || window["isServiceMapEnabled"]) &&
<div style={{ display: 'flex', height: "100%" }}>
{window["isOasEnabled"] && <Button
startIcon={<img className="custom" src={services} alt="services"></img>}
size="large"
type="submit"
variant="contained"
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
style={{ marginRight: 25 }}
onClick={handleOpenOasModal}>
Show OAS
</Button>}
{window["isServiceMapEnabled"] && <Button
startIcon={<img src={serviceMap} className="custom" alt="service-map" style={{marginRight:"8%"}}></img>}
size="large"
variant="contained"
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
onClick={openServiceMapModalDebounce}>
Service Map
</Button>}
</div>
useEffect(() => {
@@ -61,9 +63,9 @@ const trafficViewerApi = {...api}
}
},[])
return (
return (
<>
<TrafficViewer setAnalyzeStatus={setAnalyzeStatus} webSocketUrl={getWebsocketUrl()}
<TrafficViewer setAnalyzeStatus={setAnalyzeStatus} webSocketUrl={getWebsocketUrl()} isCloseWebSocket={!openWebSocket}
trafficViewerApiProp={trafficViewerApi} actionButtons={actionButtons} isShowStatusBar={!openOasModal}/>
</>
);