mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-19 20:40:17 +00:00
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7f265dc4c5 | ||
|
|
1c75ce314b | ||
|
|
89836d8d75 | ||
|
|
763f72a640 | ||
|
|
a6ec246dd1 | ||
|
|
3e30815fb4 | ||
|
|
a6bf39fad5 | ||
|
|
58a1eac247 | ||
|
|
ad574956df | ||
|
|
618cb3a409 | ||
|
|
2582b7a65c |
@@ -42,8 +42,8 @@ RUN go build -ldflags="-s -w \
|
||||
-X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
|
||||
|
||||
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.6/basenine_linux_amd64 ./basenine_linux_amd64
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.6/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64 ./basenine_linux_amd64
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
|
||||
RUN shasum -a 256 -c basenine_linux_amd64.sha256
|
||||
RUN chmod +x ./basenine_linux_amd64
|
||||
|
||||
|
||||
@@ -427,9 +427,10 @@ func TestTapRedact(t *testing.T) {
|
||||
}
|
||||
|
||||
proxyUrl := getProxyUrl(defaultNamespaceName, defaultServiceName)
|
||||
requestHeaders := map[string]string{"User-Header": "Mizu"}
|
||||
requestBody := map[string]string{"User": "Mizu"}
|
||||
for i := 0; i < defaultEntriesCount; i++ {
|
||||
if _, requestErr := executeHttpPostRequest(fmt.Sprintf("%v/post", proxyUrl), requestBody); requestErr != nil {
|
||||
if _, requestErr := executeHttpPostRequestWithHeaders(fmt.Sprintf("%v/post", proxyUrl), requestHeaders, requestBody); requestErr != nil {
|
||||
t.Errorf("failed to send proxy request, err: %v", requestErr)
|
||||
return
|
||||
}
|
||||
@@ -460,12 +461,12 @@ func TestTapRedact(t *testing.T) {
|
||||
headers := request["_headers"].([]interface{})
|
||||
for _, headerInterface := range headers {
|
||||
header := headerInterface.(map[string]interface{})
|
||||
if header["name"].(string) != "User-Agent" {
|
||||
if header["name"].(string) != "User-Header" {
|
||||
continue
|
||||
}
|
||||
|
||||
userAgent := header["value"].(string)
|
||||
if userAgent != "[REDACTED]" {
|
||||
userHeader := header["value"].(string)
|
||||
if userHeader != "[REDACTED]" {
|
||||
return fmt.Errorf("unexpected result - user agent is not redacted")
|
||||
}
|
||||
}
|
||||
@@ -530,9 +531,10 @@ func TestTapNoRedact(t *testing.T) {
|
||||
}
|
||||
|
||||
proxyUrl := getProxyUrl(defaultNamespaceName, defaultServiceName)
|
||||
requestHeaders := map[string]string{"User-Header": "Mizu"}
|
||||
requestBody := map[string]string{"User": "Mizu"}
|
||||
for i := 0; i < defaultEntriesCount; i++ {
|
||||
if _, requestErr := executeHttpPostRequest(fmt.Sprintf("%v/post", proxyUrl), requestBody); requestErr != nil {
|
||||
if _, requestErr := executeHttpPostRequestWithHeaders(fmt.Sprintf("%v/post", proxyUrl), requestHeaders, requestBody); requestErr != nil {
|
||||
t.Errorf("failed to send proxy request, err: %v", requestErr)
|
||||
return
|
||||
}
|
||||
@@ -563,12 +565,12 @@ func TestTapNoRedact(t *testing.T) {
|
||||
headers := request["_headers"].([]interface{})
|
||||
for _, headerInterface := range headers {
|
||||
header := headerInterface.(map[string]interface{})
|
||||
if header["name"].(string) != "User-Agent" {
|
||||
if header["name"].(string) != "User-Header" {
|
||||
continue
|
||||
}
|
||||
|
||||
userAgent := header["value"].(string)
|
||||
if userAgent == "[REDACTED]" {
|
||||
userHeader := header["value"].(string)
|
||||
if userHeader == "[REDACTED]" {
|
||||
return fmt.Errorf("unexpected result - user agent is redacted")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -240,13 +240,24 @@ func executeHttpGetRequest(url string) (interface{}, error) {
|
||||
return executeHttpRequest(response, requestErr)
|
||||
}
|
||||
|
||||
func executeHttpPostRequest(url string, body interface{}) (interface{}, error) {
|
||||
func executeHttpPostRequestWithHeaders(url string, headers map[string]string, body interface{}) (interface{}, error) {
|
||||
requestBody, jsonErr := json.Marshal(body)
|
||||
if jsonErr != nil {
|
||||
return nil, jsonErr
|
||||
}
|
||||
|
||||
response, requestErr := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
|
||||
request, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(requestBody))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
request.Header.Add("Content-Type", "application/json")
|
||||
for headerKey, headerValue := range headers {
|
||||
request.Header.Add(headerKey, headerValue)
|
||||
}
|
||||
|
||||
client := &http.Client{}
|
||||
response, requestErr := client.Do(request)
|
||||
return executeHttpRequest(response, requestErr)
|
||||
}
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ require (
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
|
||||
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211109233221-12b405471084
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
github.com/up9inc/mizu/tap v0.0.0
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
|
||||
@@ -450,8 +450,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
|
||||
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
|
||||
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
|
||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211109233221-12b405471084 h1:gLoP7AyS/c6pYuBQOgALWpzzc5/aSrq98Lr49JRfmfs=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211109233221-12b405471084/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5 h1:JbLairDLEJpAC8bwmFuOAB+LYpY/oQbzGRSWRpkF7PQ=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
|
||||
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA=
|
||||
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
|
||||
|
||||
@@ -207,7 +207,7 @@ func loadExtensions() {
|
||||
extensionsMap = make(map[string]*tapApi.Extension)
|
||||
for i, file := range files {
|
||||
filename := file.Name()
|
||||
logger.Log.Infof("Loading extension: %s\n", filename)
|
||||
logger.Log.Infof("Loading extension: %s", filename)
|
||||
extension := &tapApi.Extension{
|
||||
Path: path.Join(extensionsDir, filename),
|
||||
}
|
||||
@@ -219,7 +219,7 @@ func loadExtensions() {
|
||||
var ok bool
|
||||
dissector, ok = symDissector.(tapApi.Dissector)
|
||||
if err != nil || !ok {
|
||||
panic(fmt.Sprintf("Failed to load the extension: %s\n", extension.Path))
|
||||
panic(fmt.Sprintf("Failed to load the extension: %s", extension.Path))
|
||||
}
|
||||
dissector.Register(extension)
|
||||
extension.Dissector = dissector
|
||||
@@ -232,7 +232,7 @@ func loadExtensions() {
|
||||
})
|
||||
|
||||
for _, extension := range extensions {
|
||||
logger.Log.Infof("Extension Properties: %+v\n", extension)
|
||||
logger.Log.Infof("Extension Properties: %+v", extension)
|
||||
}
|
||||
|
||||
controllers.InitExtensionsMap(extensionsMap)
|
||||
@@ -459,7 +459,7 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
|
||||
return
|
||||
}
|
||||
logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr)
|
||||
case _, ok := <-tapperSyncer.TapPodChangesOut:
|
||||
case tapPodChangeEvent, ok := <-tapperSyncer.TapPodChangesOut:
|
||||
if !ok {
|
||||
logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop")
|
||||
return
|
||||
@@ -472,6 +472,7 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
|
||||
}
|
||||
api.BroadcastToBrowserClients(serializedTapStatus)
|
||||
providers.TapStatus.Pods = tapStatus.Pods
|
||||
providers.ExpectedTapperAmount = tapPodChangeEvent.ExpectedTapperAmount
|
||||
case <-ctx.Done():
|
||||
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
|
||||
return
|
||||
|
||||
@@ -76,7 +76,7 @@ func startReadingFiles(workingDir string) {
|
||||
sort.Sort(utils.ByModTime(harFiles))
|
||||
|
||||
if len(harFiles) == 0 {
|
||||
logger.Log.Infof("Waiting for new files\n")
|
||||
logger.Log.Infof("Waiting for new files")
|
||||
time.Sleep(3 * time.Second)
|
||||
continue
|
||||
}
|
||||
@@ -109,7 +109,7 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
ctx := context.Background()
|
||||
doc, contractContent, router, err := loadOAS(ctx)
|
||||
if err != nil {
|
||||
logger.Log.Infof("Disabled OAS validation: %s\n", err.Error())
|
||||
logger.Log.Infof("Disabled OAS validation: %s", err.Error())
|
||||
disableOASValidation = true
|
||||
}
|
||||
|
||||
@@ -154,7 +154,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
|
||||
unresolvedSource := connectionInfo.ClientIP
|
||||
resolvedSource = k8sResolver.Resolve(unresolvedSource)
|
||||
if resolvedSource == "" {
|
||||
logger.Log.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource)
|
||||
logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource)
|
||||
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
|
||||
return
|
||||
}
|
||||
@@ -162,7 +162,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
|
||||
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
|
||||
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
|
||||
if resolvedDestination == "" {
|
||||
logger.Log.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination)
|
||||
logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination)
|
||||
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -146,7 +146,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
var metadata *basenine.Metadata
|
||||
err = json.Unmarshal(bytes, &metadata)
|
||||
if err != nil {
|
||||
logger.Log.Debugf("Error recieving metadata: %v\n", err.Error())
|
||||
logger.Log.Debugf("Error recieving metadata: %v", err.Error())
|
||||
}
|
||||
|
||||
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
|
||||
@@ -167,7 +167,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
func socketCleanup(socketId int, socketConnection *SocketConnection) {
|
||||
err := socketConnection.connection.Close()
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Error closing socket connection for socket id %d: %v\n", socketId, err)
|
||||
logger.Log.Errorf("Error closing socket connection for socket id %d: %v", socketId, err)
|
||||
}
|
||||
|
||||
websocketIdsLock.Lock()
|
||||
|
||||
@@ -65,14 +65,14 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
|
||||
var socketMessageBase shared.WebSocketMessageMetadata
|
||||
err := json.Unmarshal(message, &socketMessageBase)
|
||||
if err != nil {
|
||||
logger.Log.Infof("Could not unmarshal websocket message %v\n", err)
|
||||
logger.Log.Infof("Could not unmarshal websocket message %v", err)
|
||||
} else {
|
||||
switch socketMessageBase.MessageType {
|
||||
case shared.WebSocketMessageTypeTappedEntry:
|
||||
var tappedEntryMessage models.WebSocketTappedEntryMessage
|
||||
err := json.Unmarshal(message, &tappedEntryMessage)
|
||||
if err != nil {
|
||||
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
|
||||
logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
|
||||
} else {
|
||||
// NOTE: This is where the message comes back from the intermediate WebSocket to code.
|
||||
h.SocketOutChannel <- tappedEntryMessage.Data
|
||||
@@ -81,7 +81,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
|
||||
var statusMessage shared.WebSocketStatusMessage
|
||||
err := json.Unmarshal(message, &statusMessage)
|
||||
if err != nil {
|
||||
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
|
||||
logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
|
||||
} else {
|
||||
providers.TapStatus.Pods = statusMessage.TappingStatus.Pods
|
||||
BroadcastToBrowserClients(message)
|
||||
@@ -90,7 +90,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
|
||||
var outboundLinkMessage models.WebsocketOutboundLinkMessage
|
||||
err := json.Unmarshal(message, &outboundLinkMessage)
|
||||
if err != nil {
|
||||
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
|
||||
logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
|
||||
} else {
|
||||
handleTLSLink(outboundLinkMessage)
|
||||
}
|
||||
|
||||
@@ -25,7 +25,12 @@ func Error(c *gin.Context, err error) bool {
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Error getting entry: %v", err)
|
||||
c.Error(err)
|
||||
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": true, "msg": err.Error()})
|
||||
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
|
||||
"error": true,
|
||||
"type": "error",
|
||||
"autoClose": "5000",
|
||||
"msg": err.Error(),
|
||||
})
|
||||
return true // signal that there was an error and the caller should return
|
||||
}
|
||||
return false // no error, can continue
|
||||
@@ -39,7 +44,13 @@ func GetEntry(c *gin.Context) {
|
||||
return // exit
|
||||
}
|
||||
err = json.Unmarshal(bytes, &entry)
|
||||
if Error(c, err) {
|
||||
if err != nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{
|
||||
"error": true,
|
||||
"type": "error",
|
||||
"autoClose": "5000",
|
||||
"msg": string(bytes),
|
||||
})
|
||||
return // exit
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,9 @@ package controllers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"mizuserver/pkg/api"
|
||||
"mizuserver/pkg/config"
|
||||
"mizuserver/pkg/holder"
|
||||
"mizuserver/pkg/providers"
|
||||
"mizuserver/pkg/up9"
|
||||
@@ -15,6 +17,13 @@ import (
|
||||
)
|
||||
|
||||
func HealthCheck(c *gin.Context) {
|
||||
if config.Config.DaemonMode {
|
||||
if providers.ExpectedTapperAmount != providers.TappersCount {
|
||||
c.JSON(http.StatusInternalServerError, fmt.Sprintf("expecting more tappers than are actually connected (%d expected, %d connected)", providers.ExpectedTapperAmount, providers.TappersCount))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
response := shared.HealthResponse{
|
||||
TapStatus: providers.TapStatus,
|
||||
TappersCount: providers.TappersCount,
|
||||
@@ -37,7 +46,7 @@ func PostTappedPods(c *gin.Context) {
|
||||
providers.TapStatus.Pods = tapStatus.Pods
|
||||
message := shared.CreateWebSocketStatusMessage(*tapStatus)
|
||||
if jsonBytes, err := json.Marshal(message); err != nil {
|
||||
logger.Log.Errorf("Could not Marshal message %v\n", err)
|
||||
logger.Log.Errorf("Could not Marshal message %v", err)
|
||||
} else {
|
||||
api.BroadcastToBrowserClients(jsonBytes)
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ var (
|
||||
TapStatus shared.TapStatus
|
||||
authStatus *models.AuthStatus
|
||||
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
|
||||
|
||||
ExpectedTapperAmount = -1 //only relevant in daemon mode as cli manages tappers otherwise
|
||||
tappersCountLock = sync.Mutex{}
|
||||
)
|
||||
|
||||
|
||||
@@ -164,10 +164,10 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
|
||||
if eventType == watch.Deleted {
|
||||
resolver.nameMap.Remove(key)
|
||||
logger.Log.Infof("setting %s=nil\n", key)
|
||||
logger.Log.Infof("setting %s=nil", key)
|
||||
} else {
|
||||
resolver.nameMap.Set(key, resolved)
|
||||
logger.Log.Infof("setting %s=%s\n", key, resolved)
|
||||
logger.Log.Infof("setting %s=%s", key, resolved)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun
|
||||
var statusError *k8serrors.StatusError
|
||||
if errors.As(err, &statusError) {
|
||||
if statusError.ErrStatus.Reason == metav1.StatusReasonForbidden {
|
||||
logger.Log.Infof("Resolver loop encountered permission error, aborting event listening - %v\n", err)
|
||||
logger.Log.Infof("Resolver loop encountered permission error, aborting event listening - %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,14 +112,14 @@ func GetAnalyzeInfo() *shared.AnalyzeStatus {
|
||||
}
|
||||
|
||||
func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
|
||||
logger.Log.Infof("Sync entries - started\n")
|
||||
logger.Log.Infof("Sync entries - started")
|
||||
|
||||
var (
|
||||
token, model string
|
||||
guestMode bool
|
||||
)
|
||||
if syncEntriesConfig.Token == "" {
|
||||
logger.Log.Infof("Sync entries - creating anonymous token. env %s\n", syncEntriesConfig.Env)
|
||||
logger.Log.Infof("Sync entries - creating anonymous token. env %s", syncEntriesConfig.Env)
|
||||
guestToken, err := createAnonymousToken(syncEntriesConfig.Env)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed creating anonymous token, err: %v", err)
|
||||
@@ -133,7 +133,7 @@ func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
|
||||
model = syncEntriesConfig.Workspace
|
||||
guestMode = false
|
||||
|
||||
logger.Log.Infof("Sync entries - upserting model. env %s, model %s\n", syncEntriesConfig.Env, model)
|
||||
logger.Log.Infof("Sync entries - upserting model. env %s, model %s", syncEntriesConfig.Env, model)
|
||||
if err := upsertModel(token, model, syncEntriesConfig.Env); err != nil {
|
||||
return fmt.Errorf("failed upserting model, err: %v", err)
|
||||
}
|
||||
@@ -144,7 +144,7 @@ func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
|
||||
return fmt.Errorf("invalid model name, model name: %s", model)
|
||||
}
|
||||
|
||||
logger.Log.Infof("Sync entries - syncing. token: %s, model: %s, guest mode: %v\n", token, model, guestMode)
|
||||
logger.Log.Infof("Sync entries - syncing. token: %s, model: %s, guest mode: %v", token, model, guestMode)
|
||||
go syncEntriesImpl(token, model, syncEntriesConfig.Env, syncEntriesConfig.UploadIntervalSec, guestMode)
|
||||
|
||||
return nil
|
||||
@@ -209,7 +209,7 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
|
||||
// "http or grpc" filter indicates that we're only interested in HTTP and gRPC entries
|
||||
query := "http or grpc"
|
||||
|
||||
logger.Log.Infof("Getting entries from the database\n")
|
||||
logger.Log.Infof("Getting entries from the database")
|
||||
|
||||
var connection *basenine.Connection
|
||||
var err error
|
||||
|
||||
@@ -282,7 +282,7 @@ func createMizuResources(ctx context.Context, cancel context.CancelFunc, kuberne
|
||||
}
|
||||
|
||||
if err := createMizuConfigmap(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil {
|
||||
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
|
||||
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v", errormessage.FormatError(err)))
|
||||
}
|
||||
|
||||
var err error
|
||||
@@ -441,7 +441,7 @@ func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provid
|
||||
}
|
||||
|
||||
func cleanUpMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
|
||||
logger.Log.Infof("\nRemoving mizu resources\n")
|
||||
logger.Log.Infof("\nRemoving mizu resources")
|
||||
|
||||
var leftoverResources []string
|
||||
|
||||
@@ -626,7 +626,7 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
break
|
||||
}
|
||||
|
||||
logger.Log.Infof("Mizu is available at %s\n", url)
|
||||
logger.Log.Infof("Mizu is available at %s", url)
|
||||
if !config.Config.HeadlessMode {
|
||||
uiUtils.OpenBrowser(url)
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ func runMizuView() {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Log.Infof("Mizu is available at %s\n", url)
|
||||
logger.Log.Infof("Mizu is available at %s", url)
|
||||
|
||||
if !config.Config.HeadlessMode {
|
||||
uiUtils.OpenBrowser(url)
|
||||
|
||||
@@ -78,6 +78,6 @@ func DumpLogs(ctx context.Context, provider *kubernetes.Provider, filePath strin
|
||||
logger.Log.Debugf("Successfully added file %s", GetLogFilePath())
|
||||
}
|
||||
|
||||
logger.Log.Infof("You can find the zip file with all logs in %s\n", filePath)
|
||||
logger.Log.Infof("You can find the zip file with all logs in %s", filePath)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -37,8 +37,8 @@ COPY agent .
|
||||
RUN go build -gcflags="all=-N -l" -o mizuagent .
|
||||
|
||||
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.6/basenine_linux_amd64 ./basenine_linux_amd64
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.6/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64 ./basenine_linux_amd64
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
|
||||
RUN shasum -a 256 -c basenine_linux_amd64.sha256
|
||||
RUN chmod +x ./basenine_linux_amd64
|
||||
|
||||
|
||||
@@ -16,18 +16,20 @@ import (
|
||||
const updateTappersDelay = 5 * time.Second
|
||||
|
||||
type TappedPodChangeEvent struct {
|
||||
Added []core.Pod
|
||||
Removed []core.Pod
|
||||
Added []core.Pod
|
||||
Removed []core.Pod
|
||||
ExpectedTapperAmount int
|
||||
}
|
||||
|
||||
// MizuTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created
|
||||
type MizuTapperSyncer struct {
|
||||
context context.Context
|
||||
CurrentlyTappedPods []core.Pod
|
||||
config TapperSyncerConfig
|
||||
kubernetesProvider *Provider
|
||||
TapPodChangesOut chan TappedPodChangeEvent
|
||||
ErrorOut chan K8sTapManagerError
|
||||
context context.Context
|
||||
CurrentlyTappedPods []core.Pod
|
||||
config TapperSyncerConfig
|
||||
kubernetesProvider *Provider
|
||||
TapPodChangesOut chan TappedPodChangeEvent
|
||||
ErrorOut chan K8sTapManagerError
|
||||
nodeToTappedPodIPMap map[string][]string
|
||||
}
|
||||
|
||||
type TapperSyncerConfig struct {
|
||||
@@ -160,9 +162,11 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch
|
||||
}
|
||||
if len(addedPods) > 0 || len(removedPods) > 0 {
|
||||
tapperSyncer.CurrentlyTappedPods = podsToTap
|
||||
tapperSyncer.nodeToTappedPodIPMap = GetNodeHostToTappedPodIpsMap(tapperSyncer.CurrentlyTappedPods)
|
||||
tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{
|
||||
Added: addedPods,
|
||||
Removed: removedPods,
|
||||
Added: addedPods,
|
||||
Removed: removedPods,
|
||||
ExpectedTapperAmount: len(tapperSyncer.nodeToTappedPodIPMap),
|
||||
}
|
||||
return nil, true
|
||||
}
|
||||
@@ -171,9 +175,7 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch
|
||||
}
|
||||
|
||||
func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
||||
nodeToTappedPodIPMap := GetNodeHostToTappedPodIpsMap(tapperSyncer.CurrentlyTappedPods)
|
||||
|
||||
if len(nodeToTappedPodIPMap) > 0 {
|
||||
if len(tapperSyncer.nodeToTappedPodIPMap) > 0 {
|
||||
var serviceAccountName string
|
||||
if tapperSyncer.config.MizuServiceAccountExists {
|
||||
serviceAccountName = ServiceAccountName
|
||||
@@ -188,7 +190,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
||||
tapperSyncer.config.AgentImage,
|
||||
TapperPodName,
|
||||
fmt.Sprintf("%s.%s.svc.cluster.local", ApiServerPodName, tapperSyncer.config.MizuResourcesNamespace),
|
||||
nodeToTappedPodIPMap,
|
||||
tapperSyncer.nodeToTappedPodIPMap,
|
||||
serviceAccountName,
|
||||
tapperSyncer.config.TapperResources,
|
||||
tapperSyncer.config.ImagePullPolicy,
|
||||
@@ -197,7 +199,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap))
|
||||
logger.Log.Debugf("Successfully created %v tappers", len(tapperSyncer.nodeToTappedPodIPMap))
|
||||
} else {
|
||||
if err := tapperSyncer.kubernetesProvider.RemoveDaemonSet(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, TapperDaemonSetName); err != nil {
|
||||
return err
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
var Log = logging.MustGetLogger("mizu")
|
||||
|
||||
var format = logging.MustStringFormatter(
|
||||
`%{time} %{level:.5s} ▶ %{pid} %{shortfile} %{shortfunc} ▶ %{message}`,
|
||||
`%{time:2006-01-02T15:04:05.999Z-07:00} %{level:-5s} ▶ %{message} ▶ %{pid} %{shortfile} %{shortfunc}`,
|
||||
)
|
||||
|
||||
func InitLogger(logPath string) {
|
||||
|
||||
@@ -2,9 +2,9 @@ package shared
|
||||
|
||||
import (
|
||||
"github.com/op/go-logging"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
@@ -141,14 +141,12 @@ func (r *RulePolicy) validateType() bool {
|
||||
permitedTypes := []string{"json", "header", "slo"}
|
||||
_, found := Find(permitedTypes, r.Type)
|
||||
if !found {
|
||||
log.Printf("Error: %s. ", r.Name)
|
||||
log.Printf("Only json, header and slo types are supported on rule definition. This rule will be ignored\n")
|
||||
logger.Log.Errorf("Only json, header and slo types are supported on rule definition. This rule will be ignored. rule name: %s", r.Name)
|
||||
found = false
|
||||
}
|
||||
if strings.ToLower(r.Type) == "slo" {
|
||||
if r.ResponseTime <= 0 {
|
||||
log.Printf("Error: %s. ", r.Name)
|
||||
log.Printf("When type=slo, the field response-time should be specified and have a value >= 1\n\n")
|
||||
logger.Log.Errorf("When rule type is slo, the field response-time should be specified and have a value >= 1. rule name: %s", r.Name)
|
||||
found = false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -287,7 +287,7 @@ func (h HTTPPayload) MarshalJSON() ([]byte, error) {
|
||||
RawResponse: &HTTPResponseWrapper{Response: h.Data.(*http.Response)},
|
||||
})
|
||||
default:
|
||||
panic(fmt.Sprintf("HTTP payload cannot be marshaled: %s\n", h.Type))
|
||||
panic(fmt.Sprintf("HTTP payload cannot be marshaled: %s", h.Type))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ func StartMemoryProfiler(envDumpPath string, envTimeInterval string) {
|
||||
|
||||
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
|
||||
|
||||
logger.Log.Infof("Writing memory profile to %s\n", filename)
|
||||
logger.Log.Infof("Writing memory profile to %s", filename)
|
||||
|
||||
f, err := os.Create(filename)
|
||||
if err != nil {
|
||||
|
||||
@@ -37,7 +37,7 @@ func (d dissecting) Register(extension *api.Extension) {
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s\n", protocol.Name)
|
||||
log.Printf("pong %s", protocol.Name)
|
||||
}
|
||||
|
||||
const amqpRequest string = "amqp_request"
|
||||
@@ -218,7 +218,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
}
|
||||
|
||||
default:
|
||||
// log.Printf("unexpected frame: %+v\n", f)
|
||||
// log.Printf("unexpected frame: %+v", f)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ func (d dissecting) Register(extension *api.Extension) {
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s\n", protocol.Name)
|
||||
log.Printf("pong %s", protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
@@ -143,6 +143,10 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
}
|
||||
}
|
||||
|
||||
if resDetails["bodySize"].(float64) < 0 {
|
||||
resDetails["bodySize"] = 0
|
||||
}
|
||||
|
||||
if item.Protocol.Version == "2.0" {
|
||||
service = fmt.Sprintf("%s://%s", scheme, authority)
|
||||
} else {
|
||||
@@ -180,6 +184,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
}
|
||||
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
if elapsedTime < 0 {
|
||||
elapsedTime = 0
|
||||
}
|
||||
httpPair, _ := json.Marshal(item.Pair)
|
||||
_protocol := protocol
|
||||
_protocol.Version = item.Protocol.Version
|
||||
@@ -418,7 +425,7 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
|
||||
|
||||
func (d dissecting) Macros() map[string]string {
|
||||
return map[string]string{
|
||||
`http`: fmt.Sprintf(`proto.abbr == "%s"`, protocol.Abbreviation),
|
||||
`http`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, protocol.Version),
|
||||
`grpc`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, http2Protocol.Version),
|
||||
`http2`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, http2Protocol.Version),
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
)
|
||||
|
||||
const maskedFieldPlaceholderValue = "[REDACTED]"
|
||||
const userAgent = "user-agent"
|
||||
|
||||
//these values MUST be all lower case and contain no `-` or `_` characters
|
||||
var personallyIdentifiableDataFields = []string{"token", "authorization", "authentication", "cookie", "userid", "password",
|
||||
@@ -32,7 +33,7 @@ func IsIgnoredUserAgent(item *api.OutputChannelItem, options *api.TrafficFilteri
|
||||
request := item.Pair.Request.Payload.(api.HTTPPayload).Data.(*http.Request)
|
||||
|
||||
for headerKey, headerValues := range request.Header {
|
||||
if strings.ToLower(headerKey) == "user-agent" {
|
||||
if strings.ToLower(headerKey) == userAgent {
|
||||
for _, userAgent := range options.IgnoredUserAgents {
|
||||
for _, headerValue := range headerValues {
|
||||
if strings.Contains(strings.ToLower(headerValue), strings.ToLower(userAgent)) {
|
||||
@@ -89,6 +90,10 @@ func filterResponseBody(response *http.Response, options *api.TrafficFilteringOp
|
||||
|
||||
func filterHeaders(headers *http.Header) {
|
||||
for key, _ := range *headers {
|
||||
if strings.ToLower(key) == userAgent {
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.ToLower(key) == "cookie" {
|
||||
headers.Del(key)
|
||||
} else if isFieldNameSensitive(key) {
|
||||
|
||||
@@ -37,7 +37,7 @@ func (d dissecting) Register(extension *api.Extension) {
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s\n", _protocol.Name)
|
||||
log.Printf("pong %s", _protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
@@ -149,6 +149,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
|
||||
request["url"] = summary
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
if elapsedTime < 0 {
|
||||
elapsedTime = 0
|
||||
}
|
||||
return &api.MizuEntry{
|
||||
Protocol: _protocol,
|
||||
Source: &api.TCP{
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
@@ -35,7 +36,7 @@ func (d dissecting) Register(extension *api.Extension) {
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s\n", protocol.Name)
|
||||
log.Printf("pong %s", protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
@@ -82,6 +83,10 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
}
|
||||
|
||||
request["url"] = summary
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
if elapsedTime < 0 {
|
||||
elapsedTime = 0
|
||||
}
|
||||
return &api.MizuEntry{
|
||||
Protocol: protocol,
|
||||
Source: &api.TCP{
|
||||
@@ -104,7 +109,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
StartTime: item.Pair.Request.CaptureTime,
|
||||
ElapsedTime: 0,
|
||||
ElapsedTime: elapsedTime,
|
||||
Summary: summary,
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
|
||||
@@ -313,7 +313,7 @@ func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
|
||||
packet.Value = fmt.Sprintf("%s]", packet.Value)
|
||||
}
|
||||
default:
|
||||
msg := fmt.Sprintf("Unrecognized element in Redis array: %v\n", reflect.TypeOf(array[0]))
|
||||
msg := fmt.Sprintf("Unrecognized element in Redis array: %v", reflect.TypeOf(array[0]))
|
||||
err = errors.New(msg)
|
||||
return
|
||||
}
|
||||
@@ -333,7 +333,7 @@ func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
|
||||
case int64:
|
||||
packet.Value = fmt.Sprintf("%d", x.(int64))
|
||||
default:
|
||||
msg := fmt.Sprintf("Unrecognized Redis data type: %v\n", reflect.TypeOf(x))
|
||||
msg := fmt.Sprintf("Unrecognized Redis data type: %v", reflect.TypeOf(x))
|
||||
err = errors.New(msg)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -193,7 +193,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
}
|
||||
|
||||
if err := diagnose.DumpMemoryProfile(*memprofile); err != nil {
|
||||
logger.Log.Errorf("Error dumping memory profile %v\n", err)
|
||||
logger.Log.Errorf("Error dumping memory profile %v", err)
|
||||
}
|
||||
|
||||
assembler.waitAndDump()
|
||||
|
||||
@@ -78,7 +78,7 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
|
||||
if *checksum {
|
||||
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
|
||||
if err != nil {
|
||||
logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err)
|
||||
logger.Log.Fatalf("Failed to set network layer for checksum: %s", err)
|
||||
}
|
||||
}
|
||||
c := context{
|
||||
|
||||
@@ -66,7 +66,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
|
||||
clientHello := tlsx.ClientHello{}
|
||||
err := clientHello.Unmarshall(msg.bytes)
|
||||
if err == nil {
|
||||
logger.Log.Debugf("Detected TLS client hello with SNI %s\n", clientHello.SNI)
|
||||
logger.Log.Debugf("Detected TLS client hello with SNI %s", clientHello.SNI)
|
||||
// TODO: Throws `panic: runtime error: invalid memory address or nil pointer dereference` error.
|
||||
// numericPort, _ := strconv.Atoi(h.tcpID.DstPort)
|
||||
// h.outboundLinkWriter.WriteOutboundLink(h.tcpID.SrcIP, h.tcpID.DstIP, numericPort, clientHello.SNI, TLSProtocol)
|
||||
|
||||
@@ -46,7 +46,7 @@ func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() {
|
||||
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) {
|
||||
stream.Close()
|
||||
diagnose.AppStats.IncDroppedTcpStreams()
|
||||
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n",
|
||||
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d",
|
||||
diagnose.AppStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000)
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -33,7 +33,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Infof("Loading extension: %s\n", filename)
|
||||
logger.Log.Infof("Loading extension: %s", filename)
|
||||
|
||||
extension := &tapApi.Extension{
|
||||
Path: path.Join(extensionsDir, filename),
|
||||
@@ -55,7 +55,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
|
||||
dissector, ok := symDissector.(tapApi.Dissector)
|
||||
|
||||
if !ok {
|
||||
return nil, errors.Errorf("Symbol Dissector type error: %v %T\n", file, symDissector)
|
||||
return nil, errors.Errorf("Symbol Dissector type error: %v %T", file, symDissector)
|
||||
}
|
||||
|
||||
dissector.Register(extension)
|
||||
@@ -68,7 +68,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
|
||||
})
|
||||
|
||||
for _, extension := range extensions {
|
||||
logger.Log.Infof("Extension Properties: %+v\n", extension)
|
||||
logger.Log.Infof("Extension Properties: %+v", extension)
|
||||
}
|
||||
|
||||
return extensions, nil
|
||||
@@ -92,7 +92,7 @@ func internalRun() error {
|
||||
|
||||
tap.StartPassiveTapper(&opts, outputItems, extenssions, &tapOpts)
|
||||
|
||||
logger.Log.Infof("Tapping, press enter to exit...\n")
|
||||
logger.Log.Infof("Tapping, press enter to exit...")
|
||||
reader := bufio.NewReader(os.Stdin)
|
||||
reader.ReadLine()
|
||||
return nil
|
||||
@@ -104,9 +104,9 @@ func main() {
|
||||
if err != nil {
|
||||
switch err := err.(type) {
|
||||
case *errors.Error:
|
||||
logger.Log.Errorf("Error: %v\n", err.ErrorStack())
|
||||
logger.Log.Errorf("Error: %v", err.ErrorStack())
|
||||
default:
|
||||
logger.Log.Errorf("Error: %v\n", err)
|
||||
logger.Log.Errorf("Error: %v", err)
|
||||
}
|
||||
|
||||
os.Exit(1)
|
||||
|
||||
3
ui/.snyk
3
ui/.snyk
@@ -133,3 +133,6 @@ ignore:
|
||||
SNYK-JS-WS-1296835:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-JSONSCHEMA-1920922:
|
||||
- '*':
|
||||
reason: Non given
|
||||
|
||||
6
ui/package-lock.json
generated
6
ui/package-lock.json
generated
@@ -13644,9 +13644,9 @@
|
||||
}
|
||||
},
|
||||
"react-scrollable-feed-virtualized": {
|
||||
"version": "1.4.3",
|
||||
"resolved": "https://registry.npmjs.org/react-scrollable-feed-virtualized/-/react-scrollable-feed-virtualized-1.4.3.tgz",
|
||||
"integrity": "sha512-M9WgJKr57jCyWKNCksc3oi+xhtO0YbL9d7Ll8Sdc5ZWOIstNvdNbNX0k4Nq6kXUVaHCJ9qE8omdSI/CxT3MLAQ=="
|
||||
"version": "1.4.8",
|
||||
"resolved": "https://registry.npmjs.org/react-scrollable-feed-virtualized/-/react-scrollable-feed-virtualized-1.4.8.tgz",
|
||||
"integrity": "sha512-zsSO/9QB+4V6HEk39lxeMEUA6JFSZjfV4stw7RF17+vZdlVhyATsTBCzsj8hZywY4F29cBfH+3/GKrMhwmhAsw=="
|
||||
},
|
||||
"react-syntax-highlighter": {
|
||||
"version": "15.4.3",
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
"react-copy-to-clipboard": "^5.0.3",
|
||||
"react-dom": "^17.0.2",
|
||||
"react-scripts": "4.0.3",
|
||||
"react-scrollable-feed-virtualized": "^1.4.3",
|
||||
"react-scrollable-feed-virtualized": "^1.4.8",
|
||||
"react-syntax-highlighter": "^15.4.3",
|
||||
"react-toastify": "^8.0.3",
|
||||
"typescript": "^4.2.4",
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import {EntryItem} from "./EntryListItem/EntryListItem";
|
||||
import React, {useRef} from "react";
|
||||
import styles from './style/EntriesList.module.sass';
|
||||
import ScrollableFeedVirtualized from "react-scrollable-feed-virtualized";
|
||||
@@ -6,40 +5,32 @@ import down from "./assets/downImg.svg";
|
||||
|
||||
interface EntriesListProps {
|
||||
entries: any[];
|
||||
setEntries: (entries: any[]) => void;
|
||||
focusedEntryId: string;
|
||||
setFocusedEntryId: (id: string) => void;
|
||||
listEntryREF: any;
|
||||
onScrollEvent: (isAtBottom:boolean) => void;
|
||||
scrollableList: boolean;
|
||||
ws: any
|
||||
openWebSocket: any;
|
||||
query: string;
|
||||
updateQuery: any;
|
||||
onSnapBrokenEvent: () => void;
|
||||
isSnappedToBottom: boolean;
|
||||
setIsSnappedToBottom: any;
|
||||
queriedCurrent: number;
|
||||
queriedTotal: number;
|
||||
startTime: number;
|
||||
}
|
||||
|
||||
export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, focusedEntryId, setFocusedEntryId, listEntryREF, onScrollEvent, scrollableList, ws, openWebSocket, query, updateQuery, queriedCurrent, queriedTotal, startTime}) => {
|
||||
export const EntriesList: React.FC<EntriesListProps> = ({entries, listEntryREF, onSnapBrokenEvent, isSnappedToBottom, setIsSnappedToBottom, queriedCurrent, queriedTotal, startTime}) => {
|
||||
|
||||
const scrollableRef = useRef(null);
|
||||
|
||||
return <>
|
||||
<div className={styles.list}>
|
||||
<div id="list" ref={listEntryREF} className={styles.list}>
|
||||
<ScrollableFeedVirtualized ref={scrollableRef} itemHeight={48} marginTop={10} onScroll={(isAtBottom) => onScrollEvent(isAtBottom)}>
|
||||
<ScrollableFeedVirtualized ref={scrollableRef} itemHeight={48} marginTop={10} onSnapBroken={onSnapBrokenEvent}>
|
||||
{false /* TODO: why there is a need for something here (not necessarily false)? */}
|
||||
{entries.map(entry => <EntryItem key={entry.id}
|
||||
entry={entry}
|
||||
setFocusedEntryId={setFocusedEntryId}
|
||||
isSelected={focusedEntryId === entry.id.toString()}
|
||||
style={{}}
|
||||
updateQuery={updateQuery}/>)}
|
||||
{entries}
|
||||
</ScrollableFeedVirtualized>
|
||||
<button type="button"
|
||||
className={`${styles.btnLive} ${scrollableList ? styles.showButton : styles.hideButton}`}
|
||||
onClick={(_) => scrollableRef.current.jumpToBottom()}>
|
||||
className={`${styles.btnLive} ${isSnappedToBottom ? styles.hideButton : styles.showButton}`}
|
||||
onClick={(_) => {
|
||||
scrollableRef.current.jumpToBottom();
|
||||
setIsSnappedToBottom(true);
|
||||
}}>
|
||||
<img alt="down" src={down} />
|
||||
</button>
|
||||
</div>
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import React from "react";
|
||||
import React, {useState} from "react";
|
||||
import styles from './EntryListItem.module.sass';
|
||||
import StatusCode, {getClassification, StatusCodeClassification} from "../UI/StatusCode";
|
||||
import Protocol, {ProtocolInterface} from "../UI/Protocol"
|
||||
@@ -38,12 +38,14 @@ interface Rules {
|
||||
interface EntryProps {
|
||||
entry: Entry;
|
||||
setFocusedEntryId: (id: string) => void;
|
||||
isSelected?: boolean;
|
||||
style: object;
|
||||
updateQuery: any;
|
||||
}
|
||||
|
||||
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSelected, style, updateQuery}) => {
|
||||
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, style, updateQuery}) => {
|
||||
|
||||
const [isSelected, setIsSelected] = useState(false);
|
||||
|
||||
const classification = getClassification(entry.statusCode)
|
||||
const numberOfRules = entry.rules.numberOfRules
|
||||
let ingoingIcon;
|
||||
@@ -119,7 +121,10 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
|
||||
id={entry.id.toString()}
|
||||
className={`${styles.row}
|
||||
${isSelected && !rule && !contractEnabled ? styles.rowSelected : additionalRulesProperties}`}
|
||||
onClick={() => setFocusedEntryId(entry.id.toString())}
|
||||
onClick={() => {
|
||||
setIsSelected(!isSelected);
|
||||
setFocusedEntryId(entry.id.toString());
|
||||
}}
|
||||
style={{
|
||||
border: isSelected ? `1px ${entry.protocol.backgroundColor} solid` : "1px transparent solid",
|
||||
position: "absolute",
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import React, {useEffect, useRef, useState} from "react";
|
||||
import {Filters} from "./Filters";
|
||||
import {EntriesList} from "./EntriesList";
|
||||
import {EntryItem} from "./EntryListItem/EntryListItem";
|
||||
import {makeStyles} from "@material-ui/core";
|
||||
import "./style/TrafficPage.sass";
|
||||
import styles from './style/EntriesList.module.sass';
|
||||
@@ -50,50 +51,58 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
|
||||
const classes = useLayoutStyles();
|
||||
|
||||
const [entries, setEntries] = useState([] as any);
|
||||
const [entriesBuffer, setEntriesBuffer] = useState([] as any);
|
||||
const [focusedEntryId, setFocusedEntryId] = useState(null);
|
||||
const [selectedEntryData, setSelectedEntryData] = useState(null);
|
||||
const [connection, setConnection] = useState(ConnectionStatus.Closed);
|
||||
|
||||
const [tappingStatus, setTappingStatus] = useState(null);
|
||||
|
||||
const [disableScrollList, setDisableScrollList] = useState(false);
|
||||
const [isSnappedToBottom, setIsSnappedToBottom] = useState(true);
|
||||
|
||||
const [query, setQueryDefault] = useState("");
|
||||
const [query, setQuery] = useState("");
|
||||
const [queryBackgroundColor, setQueryBackgroundColor] = useState("#f5f5f5");
|
||||
const [addition, updateQuery] = useState("");
|
||||
|
||||
const [queriedCurrent, setQueriedCurrent] = useState(0);
|
||||
const [queriedTotal, setQueriedTotal] = useState(0);
|
||||
|
||||
const [startTime, setStartTime] = useState(0);
|
||||
|
||||
const setQuery = async (query) => {
|
||||
if (!query) {
|
||||
setQueryBackgroundColor("#f5f5f5")
|
||||
} else {
|
||||
const data = await api.validateQuery(query);
|
||||
if (data.valid) {
|
||||
setQueryBackgroundColor("#d2fad2")
|
||||
useEffect(() => {
|
||||
(async function() {
|
||||
if (!query) {
|
||||
setQueryBackgroundColor("#f5f5f5")
|
||||
} else {
|
||||
setQueryBackgroundColor("#fad6dc")
|
||||
const data = await api.validateQuery(query);
|
||||
if (!data) {
|
||||
return;
|
||||
}
|
||||
if (data.valid) {
|
||||
setQueryBackgroundColor("#d2fad2");
|
||||
} else {
|
||||
setQueryBackgroundColor("#fad6dc");
|
||||
}
|
||||
}
|
||||
}
|
||||
setQueryDefault(query)
|
||||
}
|
||||
})();
|
||||
}, [query]);
|
||||
|
||||
const updateQuery = (addition) => {
|
||||
useEffect(() => {
|
||||
if (query) {
|
||||
setQuery(`${query} and ${addition}`)
|
||||
setQuery(`${query} and ${addition}`);
|
||||
} else {
|
||||
setQuery(addition)
|
||||
setQuery(addition);
|
||||
}
|
||||
}
|
||||
// eslint-disable-next-line
|
||||
}, [addition]);
|
||||
|
||||
const ws = useRef(null);
|
||||
|
||||
const listEntry = useRef(null);
|
||||
|
||||
const openWebSocket = (query) => {
|
||||
setEntries([])
|
||||
setEntries([]);
|
||||
setEntriesBuffer([]);
|
||||
ws.current = new WebSocket(MizuWebsocketURL);
|
||||
ws.current.onopen = () => {
|
||||
ws.current.send(query)
|
||||
@@ -108,15 +117,18 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
|
||||
const message = JSON.parse(e.data);
|
||||
switch (message.messageType) {
|
||||
case "entry":
|
||||
const entry = message.data
|
||||
if (!focusedEntryId) setFocusedEntryId(entry.id.toString())
|
||||
let newEntries = [...entries];
|
||||
setEntries([...newEntries, entry])
|
||||
if(listEntry.current) {
|
||||
if(isScrollable(listEntry.current.firstChild)) {
|
||||
setDisableScrollList(true)
|
||||
}
|
||||
}
|
||||
const entry = message.data;
|
||||
if (!focusedEntryId) setFocusedEntryId(entry.id.toString());
|
||||
setEntriesBuffer([
|
||||
...entriesBuffer,
|
||||
<EntryItem
|
||||
key={entry.id}
|
||||
entry={entry}
|
||||
setFocusedEntryId={setFocusedEntryId}
|
||||
style={{}}
|
||||
updateQuery={updateQuery}
|
||||
/>
|
||||
]);
|
||||
break
|
||||
case "status":
|
||||
setTappingStatus(message.tappingStatus);
|
||||
@@ -140,8 +152,9 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
|
||||
});
|
||||
break;
|
||||
case "queryMetadata":
|
||||
setQueriedCurrent(message.data.current)
|
||||
setQueriedTotal(message.data.total)
|
||||
setQueriedCurrent(message.data.current);
|
||||
setQueriedTotal(message.data.total);
|
||||
setEntries(entriesBuffer);
|
||||
break;
|
||||
case "startTime":
|
||||
setStartTime(message.data);
|
||||
@@ -176,6 +189,16 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
|
||||
const entryData = await api.getEntry(focusedEntryId);
|
||||
setSelectedEntryData(entryData);
|
||||
} catch (error) {
|
||||
toast[error.response.data.type](`Entry[${focusedEntryId}]: ${error.response.data.msg}`, {
|
||||
position: "bottom-right",
|
||||
theme: "colored",
|
||||
autoClose: error.response.data.autoClose,
|
||||
hideProgressBar: false,
|
||||
closeOnClick: true,
|
||||
pauseOnHover: true,
|
||||
draggable: true,
|
||||
progress: undefined,
|
||||
});
|
||||
console.error(error);
|
||||
}
|
||||
})()
|
||||
@@ -209,21 +232,17 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
|
||||
}
|
||||
}
|
||||
|
||||
const onScrollEvent = (isAtBottom) => {
|
||||
isAtBottom ? setDisableScrollList(false) : setDisableScrollList(true)
|
||||
const onSnapBrokenEvent = () => {
|
||||
setIsSnappedToBottom(false)
|
||||
}
|
||||
|
||||
const isScrollable = (element) => {
|
||||
return element.scrollHeight > element.clientHeight;
|
||||
};
|
||||
|
||||
return (
|
||||
<div className="TrafficPage">
|
||||
<div className="TrafficPageHeader">
|
||||
<img className="playPauseIcon" style={{visibility: connection === ConnectionStatus.Connected ? "visible" : "hidden"}} alt="pause"
|
||||
src={pauseIcon} onClick={toggleConnection}/>
|
||||
<img className="playPauseIcon" style={{position: "absolute", visibility: connection === ConnectionStatus.Connected ? "hidden" : "visible"}} alt="play"
|
||||
src={playIcon} onClick={toggleConnection}/>
|
||||
src={playIcon} onClick={toggleConnection}/>
|
||||
<div className="connectionText">
|
||||
{getConnectionTitle()}
|
||||
<div className={"indicatorContainer " + getConnectionStatusClass(true)}>
|
||||
@@ -243,16 +262,10 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
|
||||
<div className={styles.container}>
|
||||
<EntriesList
|
||||
entries={entries}
|
||||
setEntries={setEntries}
|
||||
focusedEntryId={focusedEntryId}
|
||||
setFocusedEntryId={setFocusedEntryId}
|
||||
listEntryREF={listEntry}
|
||||
onScrollEvent={onScrollEvent}
|
||||
scrollableList={disableScrollList}
|
||||
ws={ws.current}
|
||||
openWebSocket={openWebSocket}
|
||||
query={query}
|
||||
updateQuery={updateQuery}
|
||||
onSnapBrokenEvent={onSnapBrokenEvent}
|
||||
isSnappedToBottom={isSnappedToBottom}
|
||||
setIsSnappedToBottom={setIsSnappedToBottom}
|
||||
queriedCurrent={queriedCurrent}
|
||||
queriedTotal={queriedTotal}
|
||||
startTime={startTime}
|
||||
|
||||
@@ -3,6 +3,8 @@ import * as axios from "axios";
|
||||
// When working locally cp `cp .env.example .env`
|
||||
export const MizuWebsocketURL = process.env.REACT_APP_OVERRIDE_WS_URL ? process.env.REACT_APP_OVERRIDE_WS_URL : `ws://${window.location.host}/ws`;
|
||||
|
||||
const CancelToken = axios.CancelToken;
|
||||
|
||||
export default class Api {
|
||||
|
||||
constructor() {
|
||||
@@ -17,6 +19,8 @@ export default class Api {
|
||||
Accept: "application/json",
|
||||
}
|
||||
});
|
||||
|
||||
this.source = null;
|
||||
}
|
||||
|
||||
tapStatus = async () => {
|
||||
@@ -45,9 +49,25 @@ export default class Api {
|
||||
}
|
||||
|
||||
validateQuery = async (query) => {
|
||||
if (this.source) {
|
||||
this.source.cancel();
|
||||
}
|
||||
this.source = CancelToken.source();
|
||||
|
||||
const form = new FormData();
|
||||
form.append('query', query)
|
||||
const response = await this.client.post(`/query/validate`, form);
|
||||
const response = await this.client.post(`/query/validate`, form, {
|
||||
cancelToken: this.source.token
|
||||
}).catch(function (thrown) {
|
||||
if (!axios.isCancel(thrown)) {
|
||||
console.error('Validate error', thrown.message);
|
||||
}
|
||||
});
|
||||
|
||||
if (!response) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return response.data;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user