Compare commits

...

16 Commits

Author SHA1 Message Date
RamiBerm
18be46809e TRA-3903 minor daemon mode refactor (#479)
* Update common.go and tapRunner.go

* Update common.go
2021-11-17 11:18:08 +02:00
RamiBerm
b7f7daa05c TRA-3903 fix daemon mode in permission restricted configs (#473)
* Update tapRunner.go, permissions-all-namespaces-daemon.yaml, and 2 more files...

* Update tapRunner.go

* Update tapRunner.go and permissions-ns-daemon.yaml

* Update tapRunner.go

* Update tapRunner.go

* Update tapRunner.go
2021-11-17 11:14:43 +02:00
M. Mert Yıldıran
95d2a868e1 Update the UI screenshots (#476)
* Update the UI screenshots

* Update `mizu-ui.png`
2021-11-16 22:44:31 +03:00
RamiBerm
36077a9985 TRA-3903 - display targetted pods before waiting for all daemon resources to be created (#475)
* WIP

* Update tapRunner.go

* Update tapRunner.go
2021-11-16 17:53:38 +02:00
RamiBerm
51e0dd8ba9 TRA-3903 add flag to disable pvc creation for daemon mode (#474)
* Update tapRunner.go and tapConfig.go

* Update tapConfig.go

* Revert "Update tapConfig.go"

This reverts commit 5c7c02c4ab.
2021-11-16 17:11:47 +02:00
M. Mert Yıldıran
7f265dc4c5 Return 404 instead of 500 if the entry could not be found and display a toast message (#464) 2021-11-16 17:13:07 +03:00
RoyUP9
1c75ce314b fixed redact acceptance test (#472) 2021-11-16 15:49:08 +02:00
RamiBerm
89836d8d75 TRA-3903 better health endpoint for daemon mode (#471)
* Update main.go, status_controller.go, and 2 more files...

* Update status_controller.go and mizuTapperSyncer.go
2021-11-16 15:44:27 +02:00
RoyUP9
763f72a640 remove newline in logs, fixed logs time format (#469) 2021-11-16 12:07:48 +02:00
Igor Gov
a6ec246dd1 Stop reduction of user agent header (#468) 2021-11-16 11:33:31 +02:00
RoyUP9
3e30815fb4 changes log format to be more readable (#463) 2021-11-16 11:01:40 +02:00
M. Mert Yıldıran
a6bf39fad5 Prevent elapsedTime to be negative (#467)
Also fix the `elapsedTime` for Redis.
2021-11-16 02:52:48 +03:00
M. Mert Yıldıran
58a1eac247 Set response.bodySize to 0 if it's negative (#466) 2021-11-16 01:58:22 +03:00
M. Mert Yıldıran
ad574956df Upgrade Basenine version from 0.2.8 to 0.2.9 (#465)
Fixes `limit` helper being not finished because of lack of meta updates.
2021-11-16 00:53:29 +03:00
M. Mert Yıldıran
618cb3a409 Optimize UI entry feed performance (#452)
* Optimize the React code for feeding the entries

By building `EntryItem` only once and updating the `entries` state on meta query messages.

* Upgrade `react-scrollable-feed-virtualized` version from `1.4.3` to `1.4.8`

* Fix the `isSelected` state

* Set the query text before deciding the background to prevent lags while typing

* Upgrade Basenine version from `0.2.6` to `0.2.7`

* Set the query background color only if the query is same after the HTTP request and use `useEffect` instead

* Upgrade Basenine version from `0.2.7` to `0.2.8`

* Use `CancelToken` of `axios` instead of trying to check the query state

* Turn `updateQuery` function into a state hook

* Update the macro for `http`

* Do the `source.cancel()` call in `axios.CancelToken`

* Reduce client-side logging
2021-11-15 17:32:05 +03:00
M. Mert Yıldıran
2582b7a65c Ignore SNYK-JS-JSONSCHEMA-1920922 (#462)
Dependency tree:
`node-sass@5.0.0 > node-gyp@7.1.2 > request@2.88.2 > http-signature@1.2.0 > jsprim@1.4.1 > json-schema@0.2.3`

`node-sass` should fix it first.
2021-11-15 17:29:20 +03:00
49 changed files with 385 additions and 261 deletions

View File

@@ -42,8 +42,8 @@ RUN go build -ldflags="-s -w \
-X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
ADD https://github.com/up9inc/basenine/releases/download/v0.2.6/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.6/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
RUN shasum -a 256 -c basenine_linux_amd64.sha256
RUN chmod +x ./basenine_linux_amd64

View File

@@ -427,9 +427,10 @@ func TestTapRedact(t *testing.T) {
}
proxyUrl := getProxyUrl(defaultNamespaceName, defaultServiceName)
requestHeaders := map[string]string{"User-Header": "Mizu"}
requestBody := map[string]string{"User": "Mizu"}
for i := 0; i < defaultEntriesCount; i++ {
if _, requestErr := executeHttpPostRequest(fmt.Sprintf("%v/post", proxyUrl), requestBody); requestErr != nil {
if _, requestErr := executeHttpPostRequestWithHeaders(fmt.Sprintf("%v/post", proxyUrl), requestHeaders, requestBody); requestErr != nil {
t.Errorf("failed to send proxy request, err: %v", requestErr)
return
}
@@ -460,12 +461,12 @@ func TestTapRedact(t *testing.T) {
headers := request["_headers"].([]interface{})
for _, headerInterface := range headers {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != "User-Agent" {
if header["name"].(string) != "User-Header" {
continue
}
userAgent := header["value"].(string)
if userAgent != "[REDACTED]" {
userHeader := header["value"].(string)
if userHeader != "[REDACTED]" {
return fmt.Errorf("unexpected result - user agent is not redacted")
}
}
@@ -530,9 +531,10 @@ func TestTapNoRedact(t *testing.T) {
}
proxyUrl := getProxyUrl(defaultNamespaceName, defaultServiceName)
requestHeaders := map[string]string{"User-Header": "Mizu"}
requestBody := map[string]string{"User": "Mizu"}
for i := 0; i < defaultEntriesCount; i++ {
if _, requestErr := executeHttpPostRequest(fmt.Sprintf("%v/post", proxyUrl), requestBody); requestErr != nil {
if _, requestErr := executeHttpPostRequestWithHeaders(fmt.Sprintf("%v/post", proxyUrl), requestHeaders, requestBody); requestErr != nil {
t.Errorf("failed to send proxy request, err: %v", requestErr)
return
}
@@ -563,12 +565,12 @@ func TestTapNoRedact(t *testing.T) {
headers := request["_headers"].([]interface{})
for _, headerInterface := range headers {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != "User-Agent" {
if header["name"].(string) != "User-Header" {
continue
}
userAgent := header["value"].(string)
if userAgent == "[REDACTED]" {
userHeader := header["value"].(string)
if userHeader == "[REDACTED]" {
return fmt.Errorf("unexpected result - user agent is redacted")
}
}

View File

@@ -240,13 +240,24 @@ func executeHttpGetRequest(url string) (interface{}, error) {
return executeHttpRequest(response, requestErr)
}
func executeHttpPostRequest(url string, body interface{}) (interface{}, error) {
func executeHttpPostRequestWithHeaders(url string, headers map[string]string, body interface{}) (interface{}, error) {
requestBody, jsonErr := json.Marshal(body)
if jsonErr != nil {
return nil, jsonErr
}
response, requestErr := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
request, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(requestBody))
if err != nil {
return nil, err
}
request.Header.Add("Content-Type", "application/json")
for headerKey, headerValue := range headers {
request.Header.Add(headerKey, headerValue)
}
client := &http.Client{}
response, requestErr := client.Do(request)
return executeHttpRequest(response, requestErr)
}

View File

@@ -16,7 +16,7 @@ require (
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/up9inc/basenine/client/go v0.0.0-20211109233221-12b405471084
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0

View File

@@ -450,8 +450,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/up9inc/basenine/client/go v0.0.0-20211109233221-12b405471084 h1:gLoP7AyS/c6pYuBQOgALWpzzc5/aSrq98Lr49JRfmfs=
github.com/up9inc/basenine/client/go v0.0.0-20211109233221-12b405471084/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5 h1:JbLairDLEJpAC8bwmFuOAB+LYpY/oQbzGRSWRpkF7PQ=
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=

View File

@@ -207,7 +207,7 @@ func loadExtensions() {
extensionsMap = make(map[string]*tapApi.Extension)
for i, file := range files {
filename := file.Name()
logger.Log.Infof("Loading extension: %s\n", filename)
logger.Log.Infof("Loading extension: %s", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
}
@@ -219,7 +219,7 @@ func loadExtensions() {
var ok bool
dissector, ok = symDissector.(tapApi.Dissector)
if err != nil || !ok {
panic(fmt.Sprintf("Failed to load the extension: %s\n", extension.Path))
panic(fmt.Sprintf("Failed to load the extension: %s", extension.Path))
}
dissector.Register(extension)
extension.Dissector = dissector
@@ -232,7 +232,7 @@ func loadExtensions() {
})
for _, extension := range extensions {
logger.Log.Infof("Extension Properties: %+v\n", extension)
logger.Log.Infof("Extension Properties: %+v", extension)
}
controllers.InitExtensionsMap(extensionsMap)
@@ -459,7 +459,7 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
return
}
logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr)
case _, ok := <-tapperSyncer.TapPodChangesOut:
case tapPodChangeEvent, ok := <-tapperSyncer.TapPodChangesOut:
if !ok {
logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop")
return
@@ -472,6 +472,7 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
}
api.BroadcastToBrowserClients(serializedTapStatus)
providers.TapStatus.Pods = tapStatus.Pods
providers.ExpectedTapperAmount = tapPodChangeEvent.ExpectedTapperAmount
case <-ctx.Done():
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
return

View File

@@ -76,7 +76,7 @@ func startReadingFiles(workingDir string) {
sort.Sort(utils.ByModTime(harFiles))
if len(harFiles) == 0 {
logger.Log.Infof("Waiting for new files\n")
logger.Log.Infof("Waiting for new files")
time.Sleep(3 * time.Second)
continue
}
@@ -109,7 +109,7 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
ctx := context.Background()
doc, contractContent, router, err := loadOAS(ctx)
if err != nil {
logger.Log.Infof("Disabled OAS validation: %s\n", err.Error())
logger.Log.Infof("Disabled OAS validation: %s", err.Error())
disableOASValidation = true
}
@@ -154,7 +154,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
unresolvedSource := connectionInfo.ClientIP
resolvedSource = k8sResolver.Resolve(unresolvedSource)
if resolvedSource == "" {
logger.Log.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource)
logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource)
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
return
}
@@ -162,7 +162,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
if resolvedDestination == "" {
logger.Log.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination)
logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination)
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
return
}

View File

@@ -146,7 +146,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
var metadata *basenine.Metadata
err = json.Unmarshal(bytes, &metadata)
if err != nil {
logger.Log.Debugf("Error recieving metadata: %v\n", err.Error())
logger.Log.Debugf("Error recieving metadata: %v", err.Error())
}
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
@@ -167,7 +167,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
func socketCleanup(socketId int, socketConnection *SocketConnection) {
err := socketConnection.connection.Close()
if err != nil {
logger.Log.Errorf("Error closing socket connection for socket id %d: %v\n", socketId, err)
logger.Log.Errorf("Error closing socket connection for socket id %d: %v", socketId, err)
}
websocketIdsLock.Lock()

View File

@@ -65,14 +65,14 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
var socketMessageBase shared.WebSocketMessageMetadata
err := json.Unmarshal(message, &socketMessageBase)
if err != nil {
logger.Log.Infof("Could not unmarshal websocket message %v\n", err)
logger.Log.Infof("Could not unmarshal websocket message %v", err)
} else {
switch socketMessageBase.MessageType {
case shared.WebSocketMessageTypeTappedEntry:
var tappedEntryMessage models.WebSocketTappedEntryMessage
err := json.Unmarshal(message, &tappedEntryMessage)
if err != nil {
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
} else {
// NOTE: This is where the message comes back from the intermediate WebSocket to code.
h.SocketOutChannel <- tappedEntryMessage.Data
@@ -81,7 +81,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
var statusMessage shared.WebSocketStatusMessage
err := json.Unmarshal(message, &statusMessage)
if err != nil {
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
} else {
providers.TapStatus.Pods = statusMessage.TappingStatus.Pods
BroadcastToBrowserClients(message)
@@ -90,7 +90,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
var outboundLinkMessage models.WebsocketOutboundLinkMessage
err := json.Unmarshal(message, &outboundLinkMessage)
if err != nil {
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
} else {
handleTLSLink(outboundLinkMessage)
}

View File

@@ -25,7 +25,12 @@ func Error(c *gin.Context, err error) bool {
if err != nil {
logger.Log.Errorf("Error getting entry: %v", err)
c.Error(err)
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": true, "msg": err.Error()})
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"error": true,
"type": "error",
"autoClose": "5000",
"msg": err.Error(),
})
return true // signal that there was an error and the caller should return
}
return false // no error, can continue
@@ -39,7 +44,13 @@ func GetEntry(c *gin.Context) {
return // exit
}
err = json.Unmarshal(bytes, &entry)
if Error(c, err) {
if err != nil {
c.JSON(http.StatusNotFound, gin.H{
"error": true,
"type": "error",
"autoClose": "5000",
"msg": string(bytes),
})
return // exit
}

View File

@@ -2,7 +2,9 @@ package controllers
import (
"encoding/json"
"fmt"
"mizuserver/pkg/api"
"mizuserver/pkg/config"
"mizuserver/pkg/holder"
"mizuserver/pkg/providers"
"mizuserver/pkg/up9"
@@ -15,6 +17,13 @@ import (
)
func HealthCheck(c *gin.Context) {
if config.Config.DaemonMode {
if providers.ExpectedTapperAmount != providers.TappersCount {
c.JSON(http.StatusInternalServerError, fmt.Sprintf("expecting more tappers than are actually connected (%d expected, %d connected)", providers.ExpectedTapperAmount, providers.TappersCount))
return
}
}
response := shared.HealthResponse{
TapStatus: providers.TapStatus,
TappersCount: providers.TappersCount,
@@ -37,7 +46,7 @@ func PostTappedPods(c *gin.Context) {
providers.TapStatus.Pods = tapStatus.Pods
message := shared.CreateWebSocketStatusMessage(*tapStatus)
if jsonBytes, err := json.Marshal(message); err != nil {
logger.Log.Errorf("Could not Marshal message %v\n", err)
logger.Log.Errorf("Could not Marshal message %v", err)
} else {
api.BroadcastToBrowserClients(jsonBytes)
}

View File

@@ -19,7 +19,7 @@ var (
TapStatus shared.TapStatus
authStatus *models.AuthStatus
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
ExpectedTapperAmount = -1 //only relevant in daemon mode as cli manages tappers otherwise
tappersCountLock = sync.Mutex{}
)

View File

@@ -164,10 +164,10 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.nameMap.Remove(key)
logger.Log.Infof("setting %s=nil\n", key)
logger.Log.Infof("setting %s=nil", key)
} else {
resolver.nameMap.Set(key, resolved)
logger.Log.Infof("setting %s=%s\n", key, resolved)
logger.Log.Infof("setting %s=%s", key, resolved)
}
}
@@ -188,7 +188,7 @@ func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) {
if statusError.ErrStatus.Reason == metav1.StatusReasonForbidden {
logger.Log.Infof("Resolver loop encountered permission error, aborting event listening - %v\n", err)
logger.Log.Infof("Resolver loop encountered permission error, aborting event listening - %v", err)
return
}
}

View File

@@ -112,14 +112,14 @@ func GetAnalyzeInfo() *shared.AnalyzeStatus {
}
func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
logger.Log.Infof("Sync entries - started\n")
logger.Log.Infof("Sync entries - started")
var (
token, model string
guestMode bool
)
if syncEntriesConfig.Token == "" {
logger.Log.Infof("Sync entries - creating anonymous token. env %s\n", syncEntriesConfig.Env)
logger.Log.Infof("Sync entries - creating anonymous token. env %s", syncEntriesConfig.Env)
guestToken, err := createAnonymousToken(syncEntriesConfig.Env)
if err != nil {
return fmt.Errorf("failed creating anonymous token, err: %v", err)
@@ -133,7 +133,7 @@ func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
model = syncEntriesConfig.Workspace
guestMode = false
logger.Log.Infof("Sync entries - upserting model. env %s, model %s\n", syncEntriesConfig.Env, model)
logger.Log.Infof("Sync entries - upserting model. env %s, model %s", syncEntriesConfig.Env, model)
if err := upsertModel(token, model, syncEntriesConfig.Env); err != nil {
return fmt.Errorf("failed upserting model, err: %v", err)
}
@@ -144,7 +144,7 @@ func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
return fmt.Errorf("invalid model name, model name: %s", model)
}
logger.Log.Infof("Sync entries - syncing. token: %s, model: %s, guest mode: %v\n", token, model, guestMode)
logger.Log.Infof("Sync entries - syncing. token: %s, model: %s, guest mode: %v", token, model, guestMode)
go syncEntriesImpl(token, model, syncEntriesConfig.Env, syncEntriesConfig.UploadIntervalSec, guestMode)
return nil
@@ -209,7 +209,7 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
// "http or grpc" filter indicates that we're only interested in HTTP and gRPC entries
query := "http or grpc"
logger.Log.Infof("Getting entries from the database\n")
logger.Log.Infof("Getting entries from the database")
var connection *basenine.Connection
var err error

Binary file not shown.

Before

Width:  |  Height:  |  Size: 491 KiB

After

Width:  |  Height:  |  Size: 640 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 55 KiB

After

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

After

Width:  |  Height:  |  Size: 53 KiB

View File

@@ -4,9 +4,15 @@ import (
"context"
"errors"
"fmt"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/telemetry"
"os"
"os/signal"
"path"
"syscall"
"time"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
@@ -64,3 +70,42 @@ func handleKubernetesProviderError(err error) {
logger.Log.Error(err)
}
}
func finishMizuExecution(kubernetesProvider *kubernetes.Provider, apiProvider *apiserver.Provider) {
telemetry.ReportAPICalls(apiProvider)
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
dumpLogsIfNeeded(removalCtx, kubernetesProvider)
cleanUpMizuResources(removalCtx, cancel, kubernetesProvider)
}
func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) {
if !config.Config.DumpLogs {
return
}
mizuDir := mizu.GetMizuFolderPath()
filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
if err := fsUtils.DumpLogs(ctx, kubernetesProvider, filePath); err != nil {
logger.Log.Errorf("Failed dump logs %v", err)
}
}
func cleanUpMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
logger.Log.Infof("\nRemoving mizu resources")
var leftoverResources []string
if config.Config.IsNsRestrictedMode() {
leftoverResources = cleanUpRestrictedMode(ctx, kubernetesProvider)
} else {
leftoverResources = cleanUpNonRestrictedMode(ctx, cancel, kubernetesProvider)
}
if len(leftoverResources) > 0 {
errMsg := fmt.Sprintf("Failed to remove the following resources, for more info check logs at %s:", fsUtils.GetLogFilePath())
for _, resource := range leftoverResources {
errMsg += "\n- " + resource
}
logger.Log.Errorf(uiUtils.Error, errMsg)
}
}

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io/ioutil"
"path"
"regexp"
"strings"
"time"
@@ -25,7 +24,6 @@ import (
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/kubernetes"
@@ -119,6 +117,9 @@ func RunMizuTap() {
logger.Log.Infof("Tapping pods in %s", namespacesStr)
if config.Config.Tap.DryRun {
if err := printTappedPodsPreview(ctx, kubernetesProvider, targetNamespaces); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error listing pods: %v", errormessage.FormatError(err)))
}
return
}
@@ -134,7 +135,7 @@ func RunMizuTap() {
return
}
if config.Config.Tap.DaemonMode {
if err := handleDaemonModePostCreation(cancel, kubernetesProvider); err != nil {
if err := handleDaemonModePostCreation(ctx, cancel, kubernetesProvider, targetNamespaces); err != nil {
defer finishMizuExecution(kubernetesProvider, apiProvider)
cancel()
} else {
@@ -156,28 +157,38 @@ func RunMizuTap() {
}
}
func handleDaemonModePostCreation(cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) error {
func handleDaemonModePostCreation(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
if err := printTappedPodsPreview(ctx, kubernetesProvider, namespaces); err != nil {
return err
}
apiProvider := apiserver.NewProvider(GetApiServerUrl(), 90, 1*time.Second)
if err := waitForDaemonModeToBeReady(cancel, kubernetesProvider, apiProvider); err != nil {
return err
}
if err := printDaemonModeTappedPods(apiProvider); err != nil {
return err
}
return nil
}
func printDaemonModeTappedPods(apiProvider *apiserver.Provider) error {
if healthStatus, err := apiProvider.GetHealthStatus(); err != nil {
/*
this function is a bit problematic as it might be detached from the actual pods the mizu api server will tap.
The alternative would be to wait for api server to be ready and then query it for the pods it listens to, this has
the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any.
*/
func printTappedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil {
return err
} else {
for _, tappedPod := range healthStatus.TapStatus.Pods {
if len(matchingPods) == 0 {
printNoPodsFoundSuggestion(namespaces)
}
logger.Log.Info("Pods that match the provided criteria at this instant:")
for _, tappedPod := range matchingPods {
logger.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", tappedPod.Name))
}
return nil
}
return nil
}
func waitForDaemonModeToBeReady(cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider, apiProvider *apiserver.Provider) error {
@@ -215,11 +226,7 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider
}
if len(tapperSyncer.CurrentlyTappedPods) == 0 {
var suggestionStr string
if !shared.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
suggestionStr = ". Select a different namespace with -n or tap all namespaces with -A"
}
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any pods matching the regex argument%s", suggestionStr))
printNoPodsFoundSuggestion(targetNamespaces)
}
go func() {
@@ -252,6 +259,14 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider
return nil
}
func printNoPodsFoundSuggestion(targetNamespaces []string) {
var suggestionStr string
if !shared.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
suggestionStr = ". You can also try selecting a different namespace with -n or tap all namespaces with -A"
}
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any currently running pods that match the regex argument, mizu will automatically tap matching pods if any are created later%s", suggestionStr))
}
func getErrorDisplayTextForK8sTapManagerError(err kubernetes.K8sTapManagerError) string {
switch err.TapManagerReason {
case kubernetes.TapManagerPodListError:
@@ -282,7 +297,7 @@ func createMizuResources(ctx context.Context, cancel context.CancelFunc, kuberne
}
if err := createMizuConfigmap(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v", errormessage.FormatError(err)))
}
var err error
@@ -359,20 +374,9 @@ func createMizuApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.
}
func createMizuApiServerDeployment(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.ApiServerOptions) error {
isDefaultStorageClassAvailable, err := kubernetesProvider.IsDefaultStorageProviderAvailable(ctx)
volumeClaimCreated := false
if err != nil {
return err
}
if isDefaultStorageClassAvailable {
if _, err = kubernetesProvider.CreatePersistentVolumeClaim(ctx, config.Config.MizuResourcesNamespace, kubernetes.PersistentVolumeClaimName, config.Config.Tap.MaxEntriesDBSizeBytes()+mizu.DaemonModePersistentVolumeSizeBufferBytes); err != nil {
logger.Log.Warningf(uiUtils.Yellow, "An error has occured while creating a persistent volume claim for mizu, this will mean that mizu's data will be lost on pod restart")
logger.Log.Debugf("error creating persistent volume claim: %v", err)
} else {
volumeClaimCreated = true
}
} else {
logger.Log.Warningf(uiUtils.Yellow, "Could not find default volume provider in this cluster, this will mean that mizu's data will be lost on pod restart")
if !config.Config.Tap.NoPersistentVolumeClaim {
volumeClaimCreated = TryToCreatePersistentVolumeClaim(ctx, kubernetesProvider)
}
pod, err := kubernetesProvider.GetMizuApiServerPodObject(opts, volumeClaimCreated, kubernetes.PersistentVolumeClaimName)
@@ -387,6 +391,26 @@ func createMizuApiServerDeployment(ctx context.Context, kubernetesProvider *kube
return nil
}
func TryToCreatePersistentVolumeClaim(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
isDefaultStorageClassAvailable, err := kubernetesProvider.IsDefaultStorageProviderAvailable(ctx)
if err != nil {
logger.Log.Warningf(uiUtils.Yellow, "An error occured when checking if a default storage provider exists in this cluster, this means mizu data will be lost on mizu-api-server pod restart")
logger.Log.Debugf("error checking if default storage class exists: %v", err)
return false
} else if !isDefaultStorageClassAvailable {
logger.Log.Warningf(uiUtils.Yellow, "Could not find default storage provider in this cluster, this means mizu data will be lost on mizu-api-server pod restart")
return false
}
if _, err = kubernetesProvider.CreatePersistentVolumeClaim(ctx, config.Config.MizuResourcesNamespace, kubernetes.PersistentVolumeClaimName, config.Config.Tap.MaxEntriesDBSizeBytes()+mizu.DaemonModePersistentVolumeSizeBufferBytes); err != nil {
logger.Log.Warningf(uiUtils.Yellow, "An error has occured while creating a persistent volume claim for mizu, this means mizu data will be lost on mizu-api-server pod restart")
logger.Log.Debugf("error creating persistent volume claim: %v", err)
return false
}
return true
}
func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
var compiledRegexSlice []*api.SerializableRegexp
@@ -421,45 +445,6 @@ func getSyncEntriesConfig() *shared.SyncEntriesConfig {
}
}
func finishMizuExecution(kubernetesProvider *kubernetes.Provider, apiProvider *apiserver.Provider) {
telemetry.ReportAPICalls(apiProvider)
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
dumpLogsIfNeeded(removalCtx, kubernetesProvider)
cleanUpMizuResources(removalCtx, cancel, kubernetesProvider)
}
func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) {
if !config.Config.DumpLogs {
return
}
mizuDir := mizu.GetMizuFolderPath()
filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
if err := fsUtils.DumpLogs(ctx, kubernetesProvider, filePath); err != nil {
logger.Log.Errorf("Failed dump logs %v", err)
}
}
func cleanUpMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
logger.Log.Infof("\nRemoving mizu resources\n")
var leftoverResources []string
if config.Config.IsNsRestrictedMode() {
leftoverResources = cleanUpRestrictedMode(ctx, kubernetesProvider)
} else {
leftoverResources = cleanUpNonRestrictedMode(ctx, cancel, kubernetesProvider)
}
if len(leftoverResources) > 0 {
errMsg := fmt.Sprintf("Failed to remove the following resources, for more info check logs at %s:", fsUtils.GetLogFilePath())
for _, resource := range leftoverResources {
errMsg += "\n- " + resource
}
logger.Log.Errorf(uiUtils.Error, errMsg)
}
}
func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.Provider) []string {
leftoverResources := make([]string, 0)
@@ -626,7 +611,7 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
break
}
logger.Log.Infof("Mizu is available at %s\n", url)
logger.Log.Infof("Mizu is available at %s", url)
if !config.Config.HeadlessMode {
uiUtils.OpenBrowser(url)
}

View File

@@ -56,7 +56,7 @@ func runMizuView() {
return
}
logger.Log.Infof("Mizu is available at %s\n", url)
logger.Log.Infof("Mizu is available at %s", url)
if !config.Config.HeadlessMode {
uiUtils.OpenBrowser(url)

View File

@@ -3,6 +3,8 @@ package configStructs
import (
"errors"
"fmt"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger"
"regexp"
"github.com/up9inc/mizu/shared"
@@ -26,25 +28,26 @@ const (
)
type TapConfig struct {
UploadIntervalSec int `yaml:"upload-interval" default:"10"`
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
ProxyHost string `yaml:"proxy-host" default:"127.0.0.1"`
Namespaces []string `yaml:"namespaces"`
Analysis bool `yaml:"analysis" default:"false"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
PlainTextFilterRegexes []string `yaml:"regex-masking"`
IgnoredUserAgents []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
DryRun bool `yaml:"dry-run" default:"false"`
Workspace string `yaml:"workspace"`
EnforcePolicyFile string `yaml:"traffic-validation-file"`
ContractFile string `yaml:"contract"`
AskUploadConfirmation bool `yaml:"ask-upload-confirmation" default:"true"`
ApiServerResources shared.Resources `yaml:"api-server-resources"`
TapperResources shared.Resources `yaml:"tapper-resources"`
DaemonMode bool `yaml:"daemon" default:"false"`
UploadIntervalSec int `yaml:"upload-interval" default:"10"`
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
ProxyHost string `yaml:"proxy-host" default:"127.0.0.1"`
Namespaces []string `yaml:"namespaces"`
Analysis bool `yaml:"analysis" default:"false"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
PlainTextFilterRegexes []string `yaml:"regex-masking"`
IgnoredUserAgents []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
DryRun bool `yaml:"dry-run" default:"false"`
Workspace string `yaml:"workspace"`
EnforcePolicyFile string `yaml:"traffic-validation-file"`
ContractFile string `yaml:"contract"`
AskUploadConfirmation bool `yaml:"ask-upload-confirmation" default:"true"`
ApiServerResources shared.Resources `yaml:"api-server-resources"`
TapperResources shared.Resources `yaml:"tapper-resources"`
DaemonMode bool `yaml:"daemon" default:"false"`
NoPersistentVolumeClaim bool `yaml:"no-persistent-volume-claim" default:"false"`
}
func (config *TapConfig) PodRegex() *regexp.Regexp {
@@ -79,5 +82,9 @@ func (config *TapConfig) Validate() error {
return errors.New(fmt.Sprintf("Can't run with both --%s and --%s flags", AnalysisTapName, WorkspaceTapName))
}
if config.NoPersistentVolumeClaim && !config.DaemonMode {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("the --set tap.no-persistent-volume-claim=true flag has no effect without the --%s flag, the claim will not be created anyway.", DaemonModeTapName))
}
return nil
}

View File

@@ -78,6 +78,6 @@ func DumpLogs(ctx context.Context, provider *kubernetes.Provider, filePath strin
logger.Log.Debugf("Successfully added file %s", GetLogFilePath())
}
logger.Log.Infof("You can find the zip file with all logs in %s\n", filePath)
logger.Log.Infof("You can find the zip file with all logs in %s", filePath)
return nil
}

View File

@@ -37,8 +37,8 @@ COPY agent .
RUN go build -gcflags="all=-N -l" -o mizuagent .
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
ADD https://github.com/up9inc/basenine/releases/download/v0.2.6/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.6/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
RUN shasum -a 256 -c basenine_linux_amd64.sha256
RUN chmod +x ./basenine_linux_amd64

View File

@@ -7,15 +7,15 @@ rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "delete"]
- apiGroups: [ "" ]
- apiGroups: [ "apps" ]
resources: [ "deployments" ]
verbs: [ "create", "delete" ]
verbs: [ "get", "create", "delete" ]
- apiGroups: [""]
resources: ["services"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["create", "patch", "delete"]
verbs: ["get", "create", "patch", "delete", "list"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["get", "list", "watch", "create", "delete"]

View File

@@ -8,7 +8,7 @@ rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "delete"]
- apiGroups: [ "" ]
- apiGroups: [ "apps" ]
resources: [ "deployments" ]
verbs: [ "get", "create", "delete" ]
- apiGroups: [""]
@@ -16,7 +16,7 @@ rules:
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: ["apps"]
resources: ["daemonsets"]
verbs: ["get", "create", "patch", "delete"]
verbs: ["get", "create", "patch", "delete", "list"]
- apiGroups: [""]
resources: ["services/proxy"]
verbs: ["get"]
@@ -32,7 +32,7 @@ rules:
- apiGroups: ["rbac.authorization.k8s.io"]
resources: ["rolebindings"]
verbs: ["get", "create", "delete"]
- apiGroups: ["apps", "extensions"]
- apiGroups: ["apps", "extensions", ""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apps", "extensions"]

View File

@@ -16,18 +16,20 @@ import (
const updateTappersDelay = 5 * time.Second
type TappedPodChangeEvent struct {
Added []core.Pod
Removed []core.Pod
Added []core.Pod
Removed []core.Pod
ExpectedTapperAmount int
}
// MizuTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created
type MizuTapperSyncer struct {
context context.Context
CurrentlyTappedPods []core.Pod
config TapperSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TappedPodChangeEvent
ErrorOut chan K8sTapManagerError
context context.Context
CurrentlyTappedPods []core.Pod
config TapperSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TappedPodChangeEvent
ErrorOut chan K8sTapManagerError
nodeToTappedPodIPMap map[string][]string
}
type TapperSyncerConfig struct {
@@ -160,9 +162,11 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch
}
if len(addedPods) > 0 || len(removedPods) > 0 {
tapperSyncer.CurrentlyTappedPods = podsToTap
tapperSyncer.nodeToTappedPodIPMap = GetNodeHostToTappedPodIpsMap(tapperSyncer.CurrentlyTappedPods)
tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{
Added: addedPods,
Removed: removedPods,
Added: addedPods,
Removed: removedPods,
ExpectedTapperAmount: len(tapperSyncer.nodeToTappedPodIPMap),
}
return nil, true
}
@@ -171,9 +175,7 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch
}
func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
nodeToTappedPodIPMap := GetNodeHostToTappedPodIpsMap(tapperSyncer.CurrentlyTappedPods)
if len(nodeToTappedPodIPMap) > 0 {
if len(tapperSyncer.nodeToTappedPodIPMap) > 0 {
var serviceAccountName string
if tapperSyncer.config.MizuServiceAccountExists {
serviceAccountName = ServiceAccountName
@@ -188,7 +190,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
tapperSyncer.config.AgentImage,
TapperPodName,
fmt.Sprintf("%s.%s.svc.cluster.local", ApiServerPodName, tapperSyncer.config.MizuResourcesNamespace),
nodeToTappedPodIPMap,
tapperSyncer.nodeToTappedPodIPMap,
serviceAccountName,
tapperSyncer.config.TapperResources,
tapperSyncer.config.ImagePullPolicy,
@@ -197,7 +199,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
); err != nil {
return err
}
logger.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap))
logger.Log.Debugf("Successfully created %v tappers", len(tapperSyncer.nodeToTappedPodIPMap))
} else {
if err := tapperSyncer.kubernetesProvider.RemoveDaemonSet(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, TapperDaemonSetName); err != nil {
return err

View File

@@ -579,6 +579,11 @@ func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string,
return provider.handleRemovalError(err)
}
func (provider *Provider) RemovePersistentVolumeClaim(ctx context.Context, namespace string, volumeClaimName string) error {
err := provider.clientSet.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, volumeClaimName, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) handleRemovalError(err error) error {
// Ignore NotFound - There is nothing to delete.
// Ignore Forbidden - Assume that a user could not have created the resource in the first place.
@@ -859,10 +864,6 @@ func (provider *Provider) CreatePersistentVolumeClaim(ctx context.Context, names
return provider.clientSet.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, volumeClaim, metav1.CreateOptions{})
}
func (provider *Provider) RemovePersistentVolumeClaim(ctx context.Context, namespace string, volumeClaimName string) error {
return provider.clientSet.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, volumeClaimName, metav1.DeleteOptions{})
}
func getClientSet(config *restclient.Config) (*kubernetes.Clientset, error) {
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {

View File

@@ -9,7 +9,7 @@ import (
var Log = logging.MustGetLogger("mizu")
var format = logging.MustStringFormatter(
`%{time} %{level:.5s} ▶ %{pid} %{shortfile} %{shortfunc} ▶ %{message}`,
`%{time:2006-01-02T15:04:05.999Z-07:00} %{level:-5s} ▶ %{message} ▶ %{pid} %{shortfile} %{shortfunc}`,
)
func InitLogger(logPath string) {

View File

@@ -2,9 +2,9 @@ package shared
import (
"github.com/op/go-logging"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
"io/ioutil"
"log"
"strings"
"gopkg.in/yaml.v3"
@@ -141,14 +141,12 @@ func (r *RulePolicy) validateType() bool {
permitedTypes := []string{"json", "header", "slo"}
_, found := Find(permitedTypes, r.Type)
if !found {
log.Printf("Error: %s. ", r.Name)
log.Printf("Only json, header and slo types are supported on rule definition. This rule will be ignored\n")
logger.Log.Errorf("Only json, header and slo types are supported on rule definition. This rule will be ignored. rule name: %s", r.Name)
found = false
}
if strings.ToLower(r.Type) == "slo" {
if r.ResponseTime <= 0 {
log.Printf("Error: %s. ", r.Name)
log.Printf("When type=slo, the field response-time should be specified and have a value >= 1\n\n")
logger.Log.Errorf("When rule type is slo, the field response-time should be specified and have a value >= 1. rule name: %s", r.Name)
found = false
}
}

View File

@@ -287,7 +287,7 @@ func (h HTTPPayload) MarshalJSON() ([]byte, error) {
RawResponse: &HTTPResponseWrapper{Response: h.Data.(*http.Response)},
})
default:
panic(fmt.Sprintf("HTTP payload cannot be marshaled: %s\n", h.Type))
panic(fmt.Sprintf("HTTP payload cannot be marshaled: %s", h.Type))
}
}

View File

@@ -39,7 +39,7 @@ func StartMemoryProfiler(envDumpPath string, envTimeInterval string) {
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
logger.Log.Infof("Writing memory profile to %s\n", filename)
logger.Log.Infof("Writing memory profile to %s", filename)
f, err := os.Create(filename)
if err != nil {

View File

@@ -37,7 +37,7 @@ func (d dissecting) Register(extension *api.Extension) {
}
func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
log.Printf("pong %s", protocol.Name)
}
const amqpRequest string = "amqp_request"
@@ -218,7 +218,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
default:
// log.Printf("unexpected frame: %+v\n", f)
// log.Printf("unexpected frame: %+v", f)
}
}
}

View File

@@ -58,7 +58,7 @@ func (d dissecting) Register(extension *api.Extension) {
}
func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
@@ -143,6 +143,10 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
}
if resDetails["bodySize"].(float64) < 0 {
resDetails["bodySize"] = 0
}
if item.Protocol.Version == "2.0" {
service = fmt.Sprintf("%s://%s", scheme, authority)
} else {
@@ -180,6 +184,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if elapsedTime < 0 {
elapsedTime = 0
}
httpPair, _ := json.Marshal(item.Pair)
_protocol := protocol
_protocol.Version = item.Protocol.Version
@@ -418,7 +425,7 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
func (d dissecting) Macros() map[string]string {
return map[string]string{
`http`: fmt.Sprintf(`proto.abbr == "%s"`, protocol.Abbreviation),
`http`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, protocol.Version),
`grpc`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, http2Protocol.Version),
`http2`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, http2Protocol.Version),
}

View File

@@ -16,6 +16,7 @@ import (
)
const maskedFieldPlaceholderValue = "[REDACTED]"
const userAgent = "user-agent"
//these values MUST be all lower case and contain no `-` or `_` characters
var personallyIdentifiableDataFields = []string{"token", "authorization", "authentication", "cookie", "userid", "password",
@@ -32,7 +33,7 @@ func IsIgnoredUserAgent(item *api.OutputChannelItem, options *api.TrafficFilteri
request := item.Pair.Request.Payload.(api.HTTPPayload).Data.(*http.Request)
for headerKey, headerValues := range request.Header {
if strings.ToLower(headerKey) == "user-agent" {
if strings.ToLower(headerKey) == userAgent {
for _, userAgent := range options.IgnoredUserAgents {
for _, headerValue := range headerValues {
if strings.Contains(strings.ToLower(headerValue), strings.ToLower(userAgent)) {
@@ -89,6 +90,10 @@ func filterResponseBody(response *http.Response, options *api.TrafficFilteringOp
func filterHeaders(headers *http.Header) {
for key, _ := range *headers {
if strings.ToLower(key) == userAgent {
continue
}
if strings.ToLower(key) == "cookie" {
headers.Del(key)
} else if isFieldNameSensitive(key) {

View File

@@ -37,7 +37,7 @@ func (d dissecting) Register(extension *api.Extension) {
}
func (d dissecting) Ping() {
log.Printf("pong %s\n", _protocol.Name)
log.Printf("pong %s", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
@@ -149,6 +149,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
request["url"] = summary
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if elapsedTime < 0 {
elapsedTime = 0
}
return &api.MizuEntry{
Protocol: _protocol,
Source: &api.TCP{

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/up9inc/mizu/tap/api"
)
@@ -35,7 +36,7 @@ func (d dissecting) Register(extension *api.Extension) {
}
func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
@@ -82,6 +83,10 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
request["url"] = summary
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if elapsedTime < 0 {
elapsedTime = 0
}
return &api.MizuEntry{
Protocol: protocol,
Source: &api.TCP{
@@ -104,7 +109,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
Service: service,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: 0,
ElapsedTime: elapsedTime,
Summary: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,

View File

@@ -313,7 +313,7 @@ func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
packet.Value = fmt.Sprintf("%s]", packet.Value)
}
default:
msg := fmt.Sprintf("Unrecognized element in Redis array: %v\n", reflect.TypeOf(array[0]))
msg := fmt.Sprintf("Unrecognized element in Redis array: %v", reflect.TypeOf(array[0]))
err = errors.New(msg)
return
}
@@ -333,7 +333,7 @@ func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
case int64:
packet.Value = fmt.Sprintf("%d", x.(int64))
default:
msg := fmt.Sprintf("Unrecognized Redis data type: %v\n", reflect.TypeOf(x))
msg := fmt.Sprintf("Unrecognized Redis data type: %v", reflect.TypeOf(x))
err = errors.New(msg)
return
}

View File

@@ -193,7 +193,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
}
if err := diagnose.DumpMemoryProfile(*memprofile); err != nil {
logger.Log.Errorf("Error dumping memory profile %v\n", err)
logger.Log.Errorf("Error dumping memory profile %v", err)
}
assembler.waitAndDump()

View File

@@ -78,7 +78,7 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
if *checksum {
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
if err != nil {
logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err)
logger.Log.Fatalf("Failed to set network layer for checksum: %s", err)
}
}
c := context{

View File

@@ -66,7 +66,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
clientHello := tlsx.ClientHello{}
err := clientHello.Unmarshall(msg.bytes)
if err == nil {
logger.Log.Debugf("Detected TLS client hello with SNI %s\n", clientHello.SNI)
logger.Log.Debugf("Detected TLS client hello with SNI %s", clientHello.SNI)
// TODO: Throws `panic: runtime error: invalid memory address or nil pointer dereference` error.
// numericPort, _ := strconv.Atoi(h.tcpID.DstPort)
// h.outboundLinkWriter.WriteOutboundLink(h.tcpID.SrcIP, h.tcpID.DstIP, numericPort, clientHello.SNI, TLSProtocol)

View File

@@ -46,7 +46,7 @@ func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) {
stream.Close()
diagnose.AppStats.IncDroppedTcpStreams()
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n",
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d",
diagnose.AppStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000)
}
} else {

View File

@@ -33,7 +33,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
continue
}
logger.Log.Infof("Loading extension: %s\n", filename)
logger.Log.Infof("Loading extension: %s", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
@@ -55,7 +55,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
dissector, ok := symDissector.(tapApi.Dissector)
if !ok {
return nil, errors.Errorf("Symbol Dissector type error: %v %T\n", file, symDissector)
return nil, errors.Errorf("Symbol Dissector type error: %v %T", file, symDissector)
}
dissector.Register(extension)
@@ -68,7 +68,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
})
for _, extension := range extensions {
logger.Log.Infof("Extension Properties: %+v\n", extension)
logger.Log.Infof("Extension Properties: %+v", extension)
}
return extensions, nil
@@ -92,7 +92,7 @@ func internalRun() error {
tap.StartPassiveTapper(&opts, outputItems, extenssions, &tapOpts)
logger.Log.Infof("Tapping, press enter to exit...\n")
logger.Log.Infof("Tapping, press enter to exit...")
reader := bufio.NewReader(os.Stdin)
reader.ReadLine()
return nil
@@ -104,9 +104,9 @@ func main() {
if err != nil {
switch err := err.(type) {
case *errors.Error:
logger.Log.Errorf("Error: %v\n", err.ErrorStack())
logger.Log.Errorf("Error: %v", err.ErrorStack())
default:
logger.Log.Errorf("Error: %v\n", err)
logger.Log.Errorf("Error: %v", err)
}
os.Exit(1)

View File

@@ -133,3 +133,6 @@ ignore:
SNYK-JS-WS-1296835:
- '*':
reason: Non given
SNYK-JS-JSONSCHEMA-1920922:
- '*':
reason: Non given

6
ui/package-lock.json generated
View File

@@ -13644,9 +13644,9 @@
}
},
"react-scrollable-feed-virtualized": {
"version": "1.4.3",
"resolved": "https://registry.npmjs.org/react-scrollable-feed-virtualized/-/react-scrollable-feed-virtualized-1.4.3.tgz",
"integrity": "sha512-M9WgJKr57jCyWKNCksc3oi+xhtO0YbL9d7Ll8Sdc5ZWOIstNvdNbNX0k4Nq6kXUVaHCJ9qE8omdSI/CxT3MLAQ=="
"version": "1.4.8",
"resolved": "https://registry.npmjs.org/react-scrollable-feed-virtualized/-/react-scrollable-feed-virtualized-1.4.8.tgz",
"integrity": "sha512-zsSO/9QB+4V6HEk39lxeMEUA6JFSZjfV4stw7RF17+vZdlVhyATsTBCzsj8hZywY4F29cBfH+3/GKrMhwmhAsw=="
},
"react-syntax-highlighter": {
"version": "15.4.3",

View File

@@ -23,7 +23,7 @@
"react-copy-to-clipboard": "^5.0.3",
"react-dom": "^17.0.2",
"react-scripts": "4.0.3",
"react-scrollable-feed-virtualized": "^1.4.3",
"react-scrollable-feed-virtualized": "^1.4.8",
"react-syntax-highlighter": "^15.4.3",
"react-toastify": "^8.0.3",
"typescript": "^4.2.4",

View File

@@ -1,4 +1,3 @@
import {EntryItem} from "./EntryListItem/EntryListItem";
import React, {useRef} from "react";
import styles from './style/EntriesList.module.sass';
import ScrollableFeedVirtualized from "react-scrollable-feed-virtualized";
@@ -6,40 +5,32 @@ import down from "./assets/downImg.svg";
interface EntriesListProps {
entries: any[];
setEntries: (entries: any[]) => void;
focusedEntryId: string;
setFocusedEntryId: (id: string) => void;
listEntryREF: any;
onScrollEvent: (isAtBottom:boolean) => void;
scrollableList: boolean;
ws: any
openWebSocket: any;
query: string;
updateQuery: any;
onSnapBrokenEvent: () => void;
isSnappedToBottom: boolean;
setIsSnappedToBottom: any;
queriedCurrent: number;
queriedTotal: number;
startTime: number;
}
export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, focusedEntryId, setFocusedEntryId, listEntryREF, onScrollEvent, scrollableList, ws, openWebSocket, query, updateQuery, queriedCurrent, queriedTotal, startTime}) => {
export const EntriesList: React.FC<EntriesListProps> = ({entries, listEntryREF, onSnapBrokenEvent, isSnappedToBottom, setIsSnappedToBottom, queriedCurrent, queriedTotal, startTime}) => {
const scrollableRef = useRef(null);
return <>
<div className={styles.list}>
<div id="list" ref={listEntryREF} className={styles.list}>
<ScrollableFeedVirtualized ref={scrollableRef} itemHeight={48} marginTop={10} onScroll={(isAtBottom) => onScrollEvent(isAtBottom)}>
<ScrollableFeedVirtualized ref={scrollableRef} itemHeight={48} marginTop={10} onSnapBroken={onSnapBrokenEvent}>
{false /* TODO: why there is a need for something here (not necessarily false)? */}
{entries.map(entry => <EntryItem key={entry.id}
entry={entry}
setFocusedEntryId={setFocusedEntryId}
isSelected={focusedEntryId === entry.id.toString()}
style={{}}
updateQuery={updateQuery}/>)}
{entries}
</ScrollableFeedVirtualized>
<button type="button"
className={`${styles.btnLive} ${scrollableList ? styles.showButton : styles.hideButton}`}
onClick={(_) => scrollableRef.current.jumpToBottom()}>
className={`${styles.btnLive} ${isSnappedToBottom ? styles.hideButton : styles.showButton}`}
onClick={(_) => {
scrollableRef.current.jumpToBottom();
setIsSnappedToBottom(true);
}}>
<img alt="down" src={down} />
</button>
</div>

View File

@@ -1,4 +1,4 @@
import React from "react";
import React, {useState} from "react";
import styles from './EntryListItem.module.sass';
import StatusCode, {getClassification, StatusCodeClassification} from "../UI/StatusCode";
import Protocol, {ProtocolInterface} from "../UI/Protocol"
@@ -38,12 +38,14 @@ interface Rules {
interface EntryProps {
entry: Entry;
setFocusedEntryId: (id: string) => void;
isSelected?: boolean;
style: object;
updateQuery: any;
}
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSelected, style, updateQuery}) => {
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, style, updateQuery}) => {
const [isSelected, setIsSelected] = useState(false);
const classification = getClassification(entry.statusCode)
const numberOfRules = entry.rules.numberOfRules
let ingoingIcon;
@@ -119,7 +121,10 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
id={entry.id.toString()}
className={`${styles.row}
${isSelected && !rule && !contractEnabled ? styles.rowSelected : additionalRulesProperties}`}
onClick={() => setFocusedEntryId(entry.id.toString())}
onClick={() => {
setIsSelected(!isSelected);
setFocusedEntryId(entry.id.toString());
}}
style={{
border: isSelected ? `1px ${entry.protocol.backgroundColor} solid` : "1px transparent solid",
position: "absolute",

View File

@@ -1,6 +1,7 @@
import React, {useEffect, useRef, useState} from "react";
import {Filters} from "./Filters";
import {EntriesList} from "./EntriesList";
import {EntryItem} from "./EntryListItem/EntryListItem";
import {makeStyles} from "@material-ui/core";
import "./style/TrafficPage.sass";
import styles from './style/EntriesList.module.sass';
@@ -50,50 +51,58 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
const classes = useLayoutStyles();
const [entries, setEntries] = useState([] as any);
const [entriesBuffer, setEntriesBuffer] = useState([] as any);
const [focusedEntryId, setFocusedEntryId] = useState(null);
const [selectedEntryData, setSelectedEntryData] = useState(null);
const [connection, setConnection] = useState(ConnectionStatus.Closed);
const [tappingStatus, setTappingStatus] = useState(null);
const [disableScrollList, setDisableScrollList] = useState(false);
const [isSnappedToBottom, setIsSnappedToBottom] = useState(true);
const [query, setQueryDefault] = useState("");
const [query, setQuery] = useState("");
const [queryBackgroundColor, setQueryBackgroundColor] = useState("#f5f5f5");
const [addition, updateQuery] = useState("");
const [queriedCurrent, setQueriedCurrent] = useState(0);
const [queriedTotal, setQueriedTotal] = useState(0);
const [startTime, setStartTime] = useState(0);
const setQuery = async (query) => {
if (!query) {
setQueryBackgroundColor("#f5f5f5")
} else {
const data = await api.validateQuery(query);
if (data.valid) {
setQueryBackgroundColor("#d2fad2")
useEffect(() => {
(async function() {
if (!query) {
setQueryBackgroundColor("#f5f5f5")
} else {
setQueryBackgroundColor("#fad6dc")
const data = await api.validateQuery(query);
if (!data) {
return;
}
if (data.valid) {
setQueryBackgroundColor("#d2fad2");
} else {
setQueryBackgroundColor("#fad6dc");
}
}
}
setQueryDefault(query)
}
})();
}, [query]);
const updateQuery = (addition) => {
useEffect(() => {
if (query) {
setQuery(`${query} and ${addition}`)
setQuery(`${query} and ${addition}`);
} else {
setQuery(addition)
setQuery(addition);
}
}
// eslint-disable-next-line
}, [addition]);
const ws = useRef(null);
const listEntry = useRef(null);
const openWebSocket = (query) => {
setEntries([])
setEntries([]);
setEntriesBuffer([]);
ws.current = new WebSocket(MizuWebsocketURL);
ws.current.onopen = () => {
ws.current.send(query)
@@ -108,15 +117,18 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
const message = JSON.parse(e.data);
switch (message.messageType) {
case "entry":
const entry = message.data
if (!focusedEntryId) setFocusedEntryId(entry.id.toString())
let newEntries = [...entries];
setEntries([...newEntries, entry])
if(listEntry.current) {
if(isScrollable(listEntry.current.firstChild)) {
setDisableScrollList(true)
}
}
const entry = message.data;
if (!focusedEntryId) setFocusedEntryId(entry.id.toString());
setEntriesBuffer([
...entriesBuffer,
<EntryItem
key={entry.id}
entry={entry}
setFocusedEntryId={setFocusedEntryId}
style={{}}
updateQuery={updateQuery}
/>
]);
break
case "status":
setTappingStatus(message.tappingStatus);
@@ -140,8 +152,9 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
});
break;
case "queryMetadata":
setQueriedCurrent(message.data.current)
setQueriedTotal(message.data.total)
setQueriedCurrent(message.data.current);
setQueriedTotal(message.data.total);
setEntries(entriesBuffer);
break;
case "startTime":
setStartTime(message.data);
@@ -176,6 +189,16 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
const entryData = await api.getEntry(focusedEntryId);
setSelectedEntryData(entryData);
} catch (error) {
toast[error.response.data.type](`Entry[${focusedEntryId}]: ${error.response.data.msg}`, {
position: "bottom-right",
theme: "colored",
autoClose: error.response.data.autoClose,
hideProgressBar: false,
closeOnClick: true,
pauseOnHover: true,
draggable: true,
progress: undefined,
});
console.error(error);
}
})()
@@ -209,21 +232,17 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
}
}
const onScrollEvent = (isAtBottom) => {
isAtBottom ? setDisableScrollList(false) : setDisableScrollList(true)
const onSnapBrokenEvent = () => {
setIsSnappedToBottom(false)
}
const isScrollable = (element) => {
return element.scrollHeight > element.clientHeight;
};
return (
<div className="TrafficPage">
<div className="TrafficPageHeader">
<img className="playPauseIcon" style={{visibility: connection === ConnectionStatus.Connected ? "visible" : "hidden"}} alt="pause"
src={pauseIcon} onClick={toggleConnection}/>
<img className="playPauseIcon" style={{position: "absolute", visibility: connection === ConnectionStatus.Connected ? "hidden" : "visible"}} alt="play"
src={playIcon} onClick={toggleConnection}/>
src={playIcon} onClick={toggleConnection}/>
<div className="connectionText">
{getConnectionTitle()}
<div className={"indicatorContainer " + getConnectionStatusClass(true)}>
@@ -243,16 +262,10 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
<div className={styles.container}>
<EntriesList
entries={entries}
setEntries={setEntries}
focusedEntryId={focusedEntryId}
setFocusedEntryId={setFocusedEntryId}
listEntryREF={listEntry}
onScrollEvent={onScrollEvent}
scrollableList={disableScrollList}
ws={ws.current}
openWebSocket={openWebSocket}
query={query}
updateQuery={updateQuery}
onSnapBrokenEvent={onSnapBrokenEvent}
isSnappedToBottom={isSnappedToBottom}
setIsSnappedToBottom={setIsSnappedToBottom}
queriedCurrent={queriedCurrent}
queriedTotal={queriedTotal}
startTime={startTime}

View File

@@ -3,6 +3,8 @@ import * as axios from "axios";
// When working locally cp `cp .env.example .env`
export const MizuWebsocketURL = process.env.REACT_APP_OVERRIDE_WS_URL ? process.env.REACT_APP_OVERRIDE_WS_URL : `ws://${window.location.host}/ws`;
const CancelToken = axios.CancelToken;
export default class Api {
constructor() {
@@ -17,6 +19,8 @@ export default class Api {
Accept: "application/json",
}
});
this.source = null;
}
tapStatus = async () => {
@@ -45,9 +49,25 @@ export default class Api {
}
validateQuery = async (query) => {
if (this.source) {
this.source.cancel();
}
this.source = CancelToken.source();
const form = new FormData();
form.append('query', query)
const response = await this.client.post(`/query/validate`, form);
const response = await this.client.post(`/query/validate`, form, {
cancelToken: this.source.token
}).catch(function (thrown) {
if (!axios.isCancel(thrown)) {
console.error('Validate error', thrown.message);
}
});
if (!response) {
return null;
}
return response.data;
}
}