mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-15 18:39:58 +00:00
Compare commits
6 Commits
31.0-dev13
...
31.0-dev18
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
76a6a77a14 | ||
|
|
2bfc523bbc | ||
|
|
66ba778384 | ||
|
|
7adbf7bf1b | ||
|
|
a97b5b3b38 | ||
|
|
aa8dcc5f5c |
2
.github/workflows/acceptance_tests.yml
vendored
2
.github/workflows/acceptance_tests.yml
vendored
@@ -43,7 +43,7 @@ jobs:
|
||||
with:
|
||||
status: ${{ job.status }}
|
||||
notification_title: 'Mizu {workflow} has {status_message}'
|
||||
message_format: '{emoji} *{workflow}* {status_message} during <{run_url}|run>, after commit <{commit_url}|{commit_sha} ${{ github.event.head_commit.message }}> ${{ github.event.head_commit.committer.name }} <${{ github.event.head_commit.committer.email }}>'
|
||||
message_format: '{emoji} *{workflow}* {status_message} during <{run_url}|run>, after commit <{commit_url}|{commit_sha}> by ${{ github.event.head_commit.committer.name }} <${{ github.event.head_commit.committer.email }}> ```${{ github.event.head_commit.message }}```'
|
||||
footer: 'Linked Repo <{repo_url}|{repo}>'
|
||||
notify_when: 'failure'
|
||||
env:
|
||||
|
||||
53
.github/workflows/static_code_analysis.yml
vendored
53
.github/workflows/static_code_analysis.yml
vendored
@@ -15,6 +15,9 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 2
|
||||
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.17'
|
||||
@@ -24,67 +27,117 @@ jobs:
|
||||
sudo apt update
|
||||
sudo apt install -y libpcap-dev
|
||||
|
||||
- name: Check Agent modified files
|
||||
id: agent_modified_files
|
||||
run: devops/check_modified_files.sh agent/
|
||||
|
||||
- name: Go lint - agent
|
||||
uses: golangci/golangci-lint-action@v2
|
||||
if: steps.agent_modified_files.outputs.matched == 'true'
|
||||
with:
|
||||
version: latest
|
||||
working-directory: agent
|
||||
args: --timeout=3m
|
||||
|
||||
- name: Check shared modified files
|
||||
id: shared_modified_files
|
||||
run: devops/check_modified_files.sh shared/
|
||||
|
||||
- name: Go lint - shared
|
||||
uses: golangci/golangci-lint-action@v2
|
||||
if: steps.shared_modified_files.outputs.matched == 'true'
|
||||
with:
|
||||
version: latest
|
||||
working-directory: shared
|
||||
args: --timeout=3m
|
||||
|
||||
- name: Check tap modified files
|
||||
id: tap_modified_files
|
||||
run: devops/check_modified_files.sh tap/
|
||||
|
||||
- name: Go lint - tap
|
||||
uses: golangci/golangci-lint-action@v2
|
||||
if: steps.tap_modified_files.outputs.matched == 'true'
|
||||
with:
|
||||
version: latest
|
||||
working-directory: tap
|
||||
args: --timeout=3m
|
||||
|
||||
- name: Check cli modified files
|
||||
id: cli_modified_files
|
||||
run: devops/check_modified_files.sh cli/
|
||||
|
||||
- name: Go lint - CLI
|
||||
uses: golangci/golangci-lint-action@v2
|
||||
if: steps.cli_modified_files.outputs.matched == 'true'
|
||||
with:
|
||||
version: latest
|
||||
working-directory: cli
|
||||
args: --timeout=3m
|
||||
|
||||
- name: Check acceptanceTests modified files
|
||||
id: acceptanceTests_modified_files
|
||||
run: devops/check_modified_files.sh acceptanceTests/
|
||||
|
||||
- name: Go lint - acceptanceTests
|
||||
uses: golangci/golangci-lint-action@v2
|
||||
if: steps.acceptanceTests_modified_files.outputs.matched == 'true'
|
||||
with:
|
||||
version: latest
|
||||
working-directory: acceptanceTests
|
||||
args: --timeout=3m
|
||||
|
||||
- name: Check tap/api modified files
|
||||
id: tap_api_modified_files
|
||||
run: devops/check_modified_files.sh tap/api/
|
||||
|
||||
- name: Go lint - tap/api
|
||||
uses: golangci/golangci-lint-action@v2
|
||||
if: steps.tap_api_modified_files.outputs.matched == 'true'
|
||||
with:
|
||||
version: latest
|
||||
working-directory: tap/api
|
||||
|
||||
- name: Check tap/extensions/amqp modified files
|
||||
id: tap_amqp_modified_files
|
||||
run: devops/check_modified_files.sh tap/extensions/amqp/
|
||||
|
||||
- name: Go lint - tap/extensions/amqp
|
||||
uses: golangci/golangci-lint-action@v2
|
||||
if: steps.tap_amqp_modified_files.outputs.matched == 'true'
|
||||
with:
|
||||
version: latest
|
||||
working-directory: tap/extensions/amqp
|
||||
|
||||
- name: Check tap/extensions/http modified files
|
||||
id: tap_http_modified_files
|
||||
run: devops/check_modified_files.sh tap/extensions/http/
|
||||
|
||||
- name: Go lint - tap/extensions/http
|
||||
uses: golangci/golangci-lint-action@v2
|
||||
if: steps.tap_http_modified_files.outputs.matched == 'true'
|
||||
with:
|
||||
version: latest
|
||||
working-directory: tap/extensions/http
|
||||
|
||||
- name: Check tap/extensions/kafka modified files
|
||||
id: tap_kafka_modified_files
|
||||
run: devops/check_modified_files.sh tap/extensions/kafka/
|
||||
|
||||
- name: Go lint - tap/extensions/kafka
|
||||
uses: golangci/golangci-lint-action@v2
|
||||
if: steps.tap_kafka_modified_files.outputs.matched == 'true'
|
||||
with:
|
||||
version: latest
|
||||
working-directory: tap/extensions/kafka
|
||||
|
||||
- name: Check tap/extensions/redis modified files
|
||||
id: tap_redis_modified_files
|
||||
run: devops/check_modified_files.sh tap/extensions/redis/
|
||||
|
||||
- name: Go lint - tap/extensions/redis
|
||||
uses: golangci/golangci-lint-action@v2
|
||||
if: steps.tap_redis_modified_files.outputs.matched == 'true'
|
||||
with:
|
||||
version: latest
|
||||
working-directory: tap/extensions/redis
|
||||
|
||||
@@ -373,4 +373,6 @@ func initializeDependencies() {
|
||||
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
|
||||
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance(nil) })
|
||||
dependency.RegisterGenerator(dependency.EntriesProvider, func() interface{} { return &entries.BasenineEntriesProvider{} })
|
||||
dependency.RegisterGenerator(dependency.EntriesSocketStreamer, func() interface{} { return &api.BasenineEntryStreamer{} })
|
||||
dependency.RegisterGenerator(dependency.EntryStreamerSocketConnector, func() interface{} { return &api.DefaultEntryStreamerSocketConnector{} })
|
||||
}
|
||||
|
||||
57
agent/pkg/api/entry_streamer_socket_connector.go
Normal file
57
agent/pkg/api/entry_streamer_socket_connector.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
type EntryStreamerSocketConnector interface {
|
||||
SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams)
|
||||
SendMetadata(socketId int, metadata *basenine.Metadata)
|
||||
SendToastError(socketId int, err error)
|
||||
CleanupSocket(socketId int)
|
||||
}
|
||||
|
||||
type DefaultEntryStreamerSocketConnector struct{}
|
||||
|
||||
func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) {
|
||||
var message []byte
|
||||
if params.EnableFullEntries {
|
||||
message, _ = models.CreateFullEntryWebSocketMessage(entry)
|
||||
} else {
|
||||
extension := extensionsMap[entry.Protocol.Name]
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
message, _ = models.CreateBaseEntryWebSocketMessage(base)
|
||||
}
|
||||
|
||||
if err := SendToSocket(socketId, message); err != nil {
|
||||
logger.Log.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) {
|
||||
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
|
||||
if err := SendToSocket(socketId, metadataBytes); err != nil {
|
||||
logger.Log.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) {
|
||||
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
|
||||
Type: "error",
|
||||
AutoClose: 5000,
|
||||
Text: fmt.Sprintf("Syntax error: %s", err.Error()),
|
||||
})
|
||||
if err := SendToSocket(socketId, toastBytes); err != nil {
|
||||
logger.Log.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) {
|
||||
socketObj := connectedWebsockets[socketId]
|
||||
socketCleanup(socketId, socketObj)
|
||||
}
|
||||
92
agent/pkg/api/socket_data_streamer.go
Normal file
92
agent/pkg/api/socket_data_streamer.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
type EntryStreamer interface {
|
||||
Get(ctx context.Context, socketId int, params *WebSocketParams) error
|
||||
}
|
||||
|
||||
type BasenineEntryStreamer struct{}
|
||||
|
||||
func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *WebSocketParams) error {
|
||||
var connection *basenine.Connection
|
||||
|
||||
entryStreamerSocketConnector := dependency.GetInstance(dependency.EntryStreamerSocketConnector).(EntryStreamerSocketConnector)
|
||||
|
||||
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("failed to establish a connection to Basenine: %v", err)
|
||||
entryStreamerSocketConnector.CleanupSocket(socketId)
|
||||
return err
|
||||
}
|
||||
|
||||
data := make(chan []byte)
|
||||
meta := make(chan []byte)
|
||||
|
||||
query := params.Query
|
||||
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
|
||||
if err != nil {
|
||||
entryStreamerSocketConnector.SendToastError(socketId, err)
|
||||
}
|
||||
|
||||
handleDataChannel := func(c *basenine.Connection, data chan []byte) {
|
||||
for {
|
||||
bytes := <-data
|
||||
|
||||
if string(bytes) == basenine.CloseChannel {
|
||||
return
|
||||
}
|
||||
|
||||
var entry *tapApi.Entry
|
||||
err = json.Unmarshal(bytes, &entry)
|
||||
if err != nil {
|
||||
logger.Log.Debugf("error unmarshalling entry: %v", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
entryStreamerSocketConnector.SendEntry(socketId, entry, params)
|
||||
}
|
||||
}
|
||||
|
||||
handleMetaChannel := func(c *basenine.Connection, meta chan []byte) {
|
||||
for {
|
||||
bytes := <-meta
|
||||
|
||||
if string(bytes) == basenine.CloseChannel {
|
||||
return
|
||||
}
|
||||
|
||||
var metadata *basenine.Metadata
|
||||
err = json.Unmarshal(bytes, &metadata)
|
||||
if err != nil {
|
||||
logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
entryStreamerSocketConnector.SendMetadata(socketId, metadata)
|
||||
}
|
||||
}
|
||||
|
||||
go handleDataChannel(connection, data)
|
||||
go handleMetaChannel(connection, meta)
|
||||
|
||||
connection.Query(query, data, meta)
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
data <- []byte(basenine.CloseChannel)
|
||||
meta <- []byte(basenine.CloseChannel)
|
||||
connection.Close()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,19 +1,15 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
"github.com/up9inc/mizu/agent/pkg/utils"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
"github.com/up9inc/mizu/agent/pkg/utils"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
@@ -25,9 +21,9 @@ func InitExtensionsMap(ref map[string]*tapApi.Extension) {
|
||||
}
|
||||
|
||||
type EventHandlers interface {
|
||||
WebSocketConnect(socketId int, isTapper bool)
|
||||
WebSocketConnect(c *gin.Context, socketId int, isTapper bool)
|
||||
WebSocketDisconnect(socketId int, isTapper bool)
|
||||
WebSocketMessage(socketId int, message []byte)
|
||||
WebSocketMessage(socketId int, isTapper bool, message []byte)
|
||||
}
|
||||
|
||||
type SocketConnection struct {
|
||||
@@ -62,11 +58,11 @@ func init() {
|
||||
|
||||
func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers) {
|
||||
SocketGetBrowserHandler = func(c *gin.Context) {
|
||||
websocketHandler(c.Writer, c.Request, eventHandlers, false)
|
||||
websocketHandler(c, eventHandlers, false)
|
||||
}
|
||||
|
||||
SocketGetTapperHandler = func(c *gin.Context) {
|
||||
websocketHandler(c.Writer, c.Request, eventHandlers, true)
|
||||
websocketHandler(c, eventHandlers, true)
|
||||
}
|
||||
|
||||
app.GET("/ws", func(c *gin.Context) {
|
||||
@@ -78,10 +74,10 @@ func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers) {
|
||||
})
|
||||
}
|
||||
|
||||
func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool) {
|
||||
ws, err := websocketUpgrader.Upgrade(w, r, nil)
|
||||
func websocketHandler(c *gin.Context, eventHandlers EventHandlers, isTapper bool) {
|
||||
ws, err := websocketUpgrader.Upgrade(c.Writer, c.Request, nil)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Failed to set websocket upgrade: %v", err)
|
||||
logger.Log.Errorf("failed to set websocket upgrade: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -93,30 +89,11 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
|
||||
websocketIdsLock.Unlock()
|
||||
|
||||
var connection *basenine.Connection
|
||||
var isQuerySet bool
|
||||
|
||||
// `!isTapper` means it's a connection from the web UI
|
||||
if !isTapper {
|
||||
connection, err = basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Failed to establish a connection to Basenine: %v", err)
|
||||
socketCleanup(socketId, connectedWebsockets[socketId])
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
data := make(chan []byte)
|
||||
meta := make(chan []byte)
|
||||
|
||||
defer func() {
|
||||
socketCleanup(socketId, connectedWebsockets[socketId])
|
||||
data <- []byte(basenine.CloseChannel)
|
||||
meta <- []byte(basenine.CloseChannel)
|
||||
connection.Close()
|
||||
}()
|
||||
|
||||
eventHandlers.WebSocketConnect(socketId, isTapper)
|
||||
eventHandlers.WebSocketConnect(c, socketId, isTapper)
|
||||
|
||||
startTimeBytes, _ := models.CreateWebsocketStartTimeMessage(utils.StartTime)
|
||||
|
||||
@@ -124,127 +101,32 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
logger.Log.Error(err)
|
||||
}
|
||||
|
||||
var params WebSocketParams
|
||||
|
||||
for {
|
||||
_, msg, err := ws.ReadMessage()
|
||||
if err != nil {
|
||||
if _, ok := err.(*websocket.CloseError); ok {
|
||||
logger.Log.Debugf("Received websocket close message, socket id: %d", socketId)
|
||||
logger.Log.Debugf("received websocket close message, socket id: %d", socketId)
|
||||
} else {
|
||||
logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err)
|
||||
logger.Log.Errorf("error reading message, socket id: %d, error: %v", socketId, err)
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
if !isTapper && !isQuerySet {
|
||||
if err := json.Unmarshal(msg, ¶ms); err != nil {
|
||||
logger.Log.Errorf("Error unmarshalling parameters: %v", socketId, err)
|
||||
continue
|
||||
}
|
||||
|
||||
query := params.Query
|
||||
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
|
||||
if err != nil {
|
||||
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
|
||||
Type: "error",
|
||||
AutoClose: 5000,
|
||||
Text: fmt.Sprintf("Syntax error: %s", err.Error()),
|
||||
})
|
||||
if err := SendToSocket(socketId, toastBytes); err != nil {
|
||||
logger.Log.Error(err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
isQuerySet = true
|
||||
|
||||
handleDataChannel := func(c *basenine.Connection, data chan []byte) {
|
||||
for {
|
||||
bytes := <-data
|
||||
|
||||
if string(bytes) == basenine.CloseChannel {
|
||||
return
|
||||
}
|
||||
|
||||
var entry *tapApi.Entry
|
||||
err = json.Unmarshal(bytes, &entry)
|
||||
if err != nil {
|
||||
logger.Log.Debugf("Error unmarshalling entry: %v", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
var message []byte
|
||||
if params.EnableFullEntries {
|
||||
message, _ = models.CreateFullEntryWebSocketMessage(entry)
|
||||
} else {
|
||||
extension := extensionsMap[entry.Protocol.Name]
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
message, _ = models.CreateBaseEntryWebSocketMessage(base)
|
||||
}
|
||||
|
||||
if err := SendToSocket(socketId, message); err != nil {
|
||||
logger.Log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
handleMetaChannel := func(c *basenine.Connection, meta chan []byte) {
|
||||
for {
|
||||
bytes := <-meta
|
||||
|
||||
if string(bytes) == basenine.CloseChannel {
|
||||
return
|
||||
}
|
||||
|
||||
var metadata *basenine.Metadata
|
||||
err = json.Unmarshal(bytes, &metadata)
|
||||
if err != nil {
|
||||
logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
|
||||
if err := SendToSocket(socketId, metadataBytes); err != nil {
|
||||
logger.Log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go handleDataChannel(connection, data)
|
||||
go handleMetaChannel(connection, meta)
|
||||
|
||||
connection.Query(query, data, meta)
|
||||
} else {
|
||||
eventHandlers.WebSocketMessage(socketId, msg)
|
||||
}
|
||||
eventHandlers.WebSocketMessage(socketId, isTapper, msg)
|
||||
}
|
||||
}
|
||||
|
||||
func socketCleanup(socketId int, socketConnection *SocketConnection) {
|
||||
err := socketConnection.connection.Close()
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Error closing socket connection for socket id %d: %v", socketId, err)
|
||||
}
|
||||
|
||||
websocketIdsLock.Lock()
|
||||
connectedWebsockets[socketId] = nil
|
||||
websocketIdsLock.Unlock()
|
||||
|
||||
socketConnection.eventHandlers.WebSocketDisconnect(socketId, socketConnection.isTapper)
|
||||
}
|
||||
|
||||
func SendToSocket(socketId int, message []byte) error {
|
||||
socketObj := connectedWebsockets[socketId]
|
||||
if socketObj == nil {
|
||||
return fmt.Errorf("Socket %v is disconnected", socketId)
|
||||
return fmt.Errorf("socket %v is disconnected", socketId)
|
||||
}
|
||||
|
||||
var sent = false
|
||||
time.AfterFunc(time.Second*5, func() {
|
||||
if !sent {
|
||||
logger.Log.Error("Socket timed out")
|
||||
logger.Log.Error("socket timed out")
|
||||
socketCleanup(socketId, socketObj)
|
||||
}
|
||||
})
|
||||
@@ -255,7 +137,20 @@ func SendToSocket(socketId int, message []byte) error {
|
||||
sent = true
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to write message to socket %v, err: %w", socketId, err)
|
||||
return fmt.Errorf("failed to write message to socket %v, err: %w", socketId, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func socketCleanup(socketId int, socketConnection *SocketConnection) {
|
||||
err := socketConnection.connection.Close()
|
||||
if err != nil {
|
||||
logger.Log.Errorf("error closing socket connection for socket id %d: %v", socketId, err)
|
||||
}
|
||||
|
||||
websocketIdsLock.Lock()
|
||||
connectedWebsockets[socketId] = nil
|
||||
websocketIdsLock.Unlock()
|
||||
|
||||
socketConnection.eventHandlers.WebSocketDisconnect(socketId, socketConnection.isTapper)
|
||||
}
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
"github.com/up9inc/mizu/agent/pkg/providers"
|
||||
"github.com/up9inc/mizu/agent/pkg/providers/tappedPods"
|
||||
"github.com/up9inc/mizu/agent/pkg/providers/tappers"
|
||||
"github.com/up9inc/mizu/agent/pkg/up9"
|
||||
@@ -17,7 +18,11 @@ import (
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
var browserClientSocketUUIDs = make([]int, 0)
|
||||
type BrowserClient struct {
|
||||
dataStreamCancelFunc context.CancelFunc
|
||||
}
|
||||
|
||||
var browserClients = make(map[int]*BrowserClient, 0)
|
||||
var tapperClientSocketUUIDs = make([]int, 0)
|
||||
var socketListLock = sync.Mutex{}
|
||||
|
||||
@@ -30,7 +35,7 @@ func init() {
|
||||
go up9.UpdateAnalyzeStatus(BroadcastToBrowserClients)
|
||||
}
|
||||
|
||||
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
||||
func (h *RoutesEventHandlers) WebSocketConnect(_ *gin.Context, socketId int, isTapper bool) {
|
||||
if isTapper {
|
||||
logger.Log.Infof("Websocket event - Tapper connected, socket ID: %d", socketId)
|
||||
tappers.Connected()
|
||||
@@ -45,7 +50,7 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
||||
logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
|
||||
|
||||
socketListLock.Lock()
|
||||
browserClientSocketUUIDs = append(browserClientSocketUUIDs, socketId)
|
||||
browserClients[socketId] = &BrowserClient{}
|
||||
socketListLock.Unlock()
|
||||
|
||||
BroadcastTappedPodsStatus()
|
||||
@@ -63,13 +68,16 @@ func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
|
||||
} else {
|
||||
logger.Log.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId)
|
||||
socketListLock.Lock()
|
||||
removeSocketUUIDFromBrowserSlice(socketId)
|
||||
if browserClients[socketId] != nil && browserClients[socketId].dataStreamCancelFunc != nil {
|
||||
browserClients[socketId].dataStreamCancelFunc()
|
||||
}
|
||||
delete(browserClients, socketId)
|
||||
socketListLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func BroadcastToBrowserClients(message []byte) {
|
||||
for _, socketId := range browserClientSocketUUIDs {
|
||||
for socketId := range browserClients {
|
||||
go func(socketId int) {
|
||||
if err := SendToSocket(socketId, message); err != nil {
|
||||
logger.Log.Error(err)
|
||||
@@ -88,7 +96,33 @@ func BroadcastToTapperClients(message []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
|
||||
func (h *RoutesEventHandlers) WebSocketMessage(socketId int, isTapper bool, message []byte) {
|
||||
if isTapper {
|
||||
HandleTapperIncomingMessage(message, h.SocketOutChannel, BroadcastToBrowserClients)
|
||||
} else {
|
||||
// we initiate the basenine stream after the first websocket message we receive (it contains the entry query), we then store a cancelfunc to later cancel this stream
|
||||
if browserClients[socketId] != nil && browserClients[socketId].dataStreamCancelFunc == nil {
|
||||
var params WebSocketParams
|
||||
if err := json.Unmarshal(message, ¶ms); err != nil {
|
||||
logger.Log.Errorf("Error: %v", socketId, err)
|
||||
return
|
||||
}
|
||||
|
||||
entriesStreamer := dependency.GetInstance(dependency.EntriesSocketStreamer).(EntryStreamer)
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
err := entriesStreamer.Get(ctx, socketId, ¶ms)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("error initializing basenine stream for browser socket %d %+v", socketId, err)
|
||||
cancelFunc()
|
||||
} else {
|
||||
browserClients[socketId].dataStreamCancelFunc = cancelFunc
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func HandleTapperIncomingMessage(message []byte, socketOutChannel chan<- *tapApi.OutputChannelItem, broadcastMessageFunc func([]byte)) {
|
||||
var socketMessageBase shared.WebSocketMessageMetadata
|
||||
err := json.Unmarshal(message, &socketMessageBase)
|
||||
if err != nil {
|
||||
@@ -102,7 +136,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
|
||||
logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
|
||||
} else {
|
||||
// NOTE: This is where the message comes back from the intermediate WebSocket to code.
|
||||
h.SocketOutChannel <- tappedEntryMessage.Data
|
||||
socketOutChannel <- tappedEntryMessage.Data
|
||||
}
|
||||
case shared.WebSocketMessageTypeUpdateStatus:
|
||||
var statusMessage shared.WebSocketStatusMessage
|
||||
@@ -110,15 +144,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
|
||||
if err != nil {
|
||||
logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
|
||||
} else {
|
||||
BroadcastToBrowserClients(message)
|
||||
}
|
||||
case shared.WebsocketMessageTypeOutboundLink:
|
||||
var outboundLinkMessage models.WebsocketOutboundLinkMessage
|
||||
err := json.Unmarshal(message, &outboundLinkMessage)
|
||||
if err != nil {
|
||||
logger.Log.Infof("Could not unmarshal message of message type %s %v", socketMessageBase.MessageType, err)
|
||||
} else {
|
||||
handleTLSLink(outboundLinkMessage)
|
||||
broadcastMessageFunc(message)
|
||||
}
|
||||
default:
|
||||
logger.Log.Infof("Received socket message of type %s for which no handlers are defined", socketMessageBase.MessageType)
|
||||
@@ -126,39 +152,6 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
|
||||
resolvedNameObject := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
|
||||
if resolvedNameObject != nil {
|
||||
outboundLinkMessage.Data.DstIP = resolvedNameObject.FullAddress
|
||||
} else if outboundLinkMessage.Data.SuggestedResolvedName != "" {
|
||||
outboundLinkMessage.Data.DstIP = outboundLinkMessage.Data.SuggestedResolvedName
|
||||
}
|
||||
cacheKey := fmt.Sprintf("%s -> %s:%d", outboundLinkMessage.Data.Src, outboundLinkMessage.Data.DstIP, outboundLinkMessage.Data.DstPort)
|
||||
_, isInCache := providers.RecentTLSLinks.Get(cacheKey)
|
||||
if isInCache {
|
||||
return
|
||||
} else {
|
||||
providers.RecentTLSLinks.SetDefault(cacheKey, outboundLinkMessage.Data)
|
||||
}
|
||||
marshaledMessage, err := json.Marshal(outboundLinkMessage)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Error marshaling outbound link message for broadcasting: %v", err)
|
||||
} else {
|
||||
logger.Log.Errorf("Broadcasting outboundlink message %s", string(marshaledMessage))
|
||||
BroadcastToBrowserClients(marshaledMessage)
|
||||
}
|
||||
}
|
||||
|
||||
func removeSocketUUIDFromBrowserSlice(uuidToRemove int) {
|
||||
newUUIDSlice := make([]int, 0, len(browserClientSocketUUIDs))
|
||||
for _, uuid := range browserClientSocketUUIDs {
|
||||
if uuid != uuidToRemove {
|
||||
newUUIDSlice = append(newUUIDSlice, uuid)
|
||||
}
|
||||
}
|
||||
browserClientSocketUUIDs = newUUIDSlice
|
||||
}
|
||||
|
||||
func removeSocketUUIDFromTapperSlice(uuidToRemove int) {
|
||||
newUUIDSlice := make([]int, 0, len(tapperClientSocketUUIDs))
|
||||
for _, uuid := range tapperClientSocketUUIDs {
|
||||
|
||||
@@ -6,4 +6,6 @@ const (
|
||||
ServiceMapGeneratorDependency = "ServiceMapGeneratorDependency"
|
||||
OasGeneratorDependency = "OasGeneratorDependency"
|
||||
EntriesProvider = "EntriesProvider"
|
||||
EntriesSocketStreamer = "EntriesSocketStreamer"
|
||||
EntryStreamerSocketConnector = "EntryStreamerSocketConnector"
|
||||
)
|
||||
|
||||
@@ -53,7 +53,16 @@ func representMapSliceAsTable(mapSlice []interface{}, selectorPrefix string) (re
|
||||
h := item.(map[string]interface{})
|
||||
key := h["name"].(string)
|
||||
value := h["value"]
|
||||
switch reflect.TypeOf(value).Kind() {
|
||||
|
||||
var reflectKind reflect.Kind
|
||||
reflectType := reflect.TypeOf(value)
|
||||
if reflectType == nil {
|
||||
reflectKind = reflect.Interface
|
||||
} else {
|
||||
reflectKind = reflect.TypeOf(value).Kind()
|
||||
}
|
||||
|
||||
switch reflectKind {
|
||||
case reflect.Slice:
|
||||
fallthrough
|
||||
case reflect.Array:
|
||||
|
||||
@@ -42,10 +42,10 @@ const OasModal = ({ openModal, handleCloseModal, getOasServices, getOasByService
|
||||
try {
|
||||
const data = await getOasByService(selectedService ? selectedService : oasServices[0]);
|
||||
setSelectedServiceSpec(data);
|
||||
} catch (e) {
|
||||
toast.error("Error occurred while fetching service OAS spec");
|
||||
console.error(e);
|
||||
}
|
||||
} catch (e) {
|
||||
toast.error("Error occurred while fetching service OAS spec");
|
||||
console.error(e);
|
||||
}
|
||||
};
|
||||
|
||||
useEffect(() => {
|
||||
@@ -61,7 +61,7 @@ const OasModal = ({ openModal, handleCloseModal, getOasServices, getOasByService
|
||||
|
||||
useEffect(() => {
|
||||
onSelectedOASService(null);
|
||||
},[oasServices])
|
||||
}, [oasServices])
|
||||
|
||||
return (
|
||||
<Modal
|
||||
@@ -80,28 +80,28 @@ const OasModal = ({ openModal, handleCloseModal, getOasServices, getOasByService
|
||||
<div className={style.boxContainer}>
|
||||
<div className={style.selectHeader}>
|
||||
<div><img src={openApiLogo} alt="openAPI" className={style.openApilogo} /></div>
|
||||
<div className={style.title}>OpenAPI</div>
|
||||
<div className={style.title}>Service Catalog</div>
|
||||
</div>
|
||||
<div style={{ cursor: "pointer" }}>
|
||||
<img src={closeIcon} alt="close" onClick={handleCloseModal} />
|
||||
</div>
|
||||
</div>
|
||||
<div className={style.selectContainer} >
|
||||
<FormControl>
|
||||
<Select
|
||||
labelId="service-select-label"
|
||||
id="service-select"
|
||||
value={selectedServiceName}
|
||||
onChangeCb={onSelectedOASService}
|
||||
>
|
||||
{oasServices.map((service) => (
|
||||
<MenuItem key={service} value={service}>
|
||||
{service}
|
||||
</MenuItem>
|
||||
))}
|
||||
</Select>
|
||||
</FormControl>
|
||||
</div>
|
||||
<FormControl>
|
||||
<Select
|
||||
labelId="service-select-label"
|
||||
id="service-select"
|
||||
value={selectedServiceName}
|
||||
onChangeCb={onSelectedOASService}
|
||||
>
|
||||
{oasServices.map((service) => (
|
||||
<MenuItem key={service} value={service}>
|
||||
{service}
|
||||
</MenuItem>
|
||||
))}
|
||||
</Select>
|
||||
</FormControl>
|
||||
</div>
|
||||
<div className={style.borderLine}></div>
|
||||
<div className={style.redoc}>
|
||||
{selectedServiceSpec && <RedocStandalone
|
||||
|
||||
@@ -1,18 +1,17 @@
|
||||
import React, { CSSProperties } from "react";
|
||||
import infoImg from 'assets/info.svg';
|
||||
import styles from "./style/InformationIcon.module.sass"
|
||||
|
||||
const DEFUALT_LINK = "https://getmizu.io/docs"
|
||||
|
||||
export interface InformationIconProps{
|
||||
export interface InformationIconProps {
|
||||
link?: string,
|
||||
style? : CSSProperties
|
||||
style?: CSSProperties
|
||||
}
|
||||
|
||||
export const InformationIcon: React.FC<InformationIconProps> = ({link,style}) => {
|
||||
export const InformationIcon: React.FC<InformationIconProps> = ({ link, style }) => {
|
||||
return <React.Fragment>
|
||||
<a href={DEFUALT_LINK ? DEFUALT_LINK : link} style={style} className={styles.flex} title="documentation" target="_blank">
|
||||
<img className="headerIcon" src={infoImg} alt="Info icon"/>
|
||||
<a href={DEFUALT_LINK ? DEFUALT_LINK : link} style={style} className={styles.linkStyle} title="documentation" target="_blank">
|
||||
<span>Docs</span>
|
||||
</a>
|
||||
</React.Fragment>
|
||||
}
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
<svg width="24" height="24" viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path d="M19 21H6.14286C5.07143 21 4 20.32 4 18.96C4 17.6 5.07143 16.92 6.14286 16.92H19V4H6.14286C5.07143 4 4 5.02 4 6.04V18.96M16.8571 17.6V20.32V17.6Z" stroke="#627EF7" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"/>
|
||||
<rect x="8" y="7" width="7" height="2" fill="#627EF7"/>
|
||||
<rect x="8" y="11" width="4" height="2" fill="#627EF7"/>
|
||||
</svg>
|
||||
|
Before Width: | Height: | Size: 454 B |
@@ -1,2 +1,8 @@
|
||||
.flex
|
||||
display: flex
|
||||
.linkStyle
|
||||
display: flex
|
||||
color: #18253d
|
||||
text-decoration: none
|
||||
font-family: "Ubuntu", sans-serif
|
||||
font-style: normal
|
||||
font-weight: 600
|
||||
font-size: 14px
|
||||
@@ -4,7 +4,7 @@
|
||||
position: absolute
|
||||
transform: translate(-50%, -3px)
|
||||
left: 50%
|
||||
z-index: 9999
|
||||
z-index: 100
|
||||
min-width: 200px
|
||||
background: $blue-color
|
||||
color: rgba(255,255,255,0.75)
|
||||
@@ -19,7 +19,7 @@
|
||||
overflow: hidden
|
||||
max-width: clamp(150px,50%,600px)
|
||||
|
||||
&.banner
|
||||
&.banner
|
||||
top: 53px
|
||||
|
||||
.podsCount
|
||||
@@ -41,7 +41,7 @@
|
||||
table
|
||||
width: 100%
|
||||
margin-top: 20px
|
||||
|
||||
|
||||
tbody
|
||||
max-height: 70vh
|
||||
overflow-y: auto
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import React, {useEffect, useState} from "react";
|
||||
import React, { useState } from "react";
|
||||
import { Button } from "@material-ui/core";
|
||||
import Api, { MizuWebsocketURL } from "../../../helpers/api";
|
||||
import debounce from 'lodash/debounce';
|
||||
import {useSetRecoilState, useRecoilState} from "recoil";
|
||||
import {useCommonStyles} from "../../../helpers/commonStyle"
|
||||
import { useRecoilState } from "recoil";
|
||||
import { useCommonStyles } from "../../../helpers/commonStyle"
|
||||
import serviceMapModalOpenAtom from "../../../recoil/serviceMapModalOpen";
|
||||
import TrafficViewer from "@up9/mizu-common"
|
||||
import "@up9/mizu-common/dist/index.css"
|
||||
@@ -17,13 +17,13 @@ interface TrafficPageProps {
|
||||
|
||||
const api = Api.getInstance();
|
||||
|
||||
export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
|
||||
export const TrafficPage: React.FC<TrafficPageProps> = ({ setAnalyzeStatus }) => {
|
||||
const commonClasses = useCommonStyles();
|
||||
const setServiceMapModalOpen = useSetRecoilState(serviceMapModalOpenAtom);
|
||||
const [serviceMapModalOpen, setServiceMapModalOpen] = useRecoilState(serviceMapModalOpenAtom);
|
||||
const [openOasModal, setOpenOasModal] = useRecoilState(oasModalOpenAtom);
|
||||
const [openWebSocket, setOpenWebSocket] = useState(true);
|
||||
|
||||
const trafficViewerApi = {...api}
|
||||
const trafficViewerApi = { ...api }
|
||||
|
||||
const handleOpenOasModal = () => {
|
||||
setOpenWebSocket(false)
|
||||
@@ -36,37 +36,31 @@ const trafficViewerApi = {...api}
|
||||
}, 500);
|
||||
|
||||
const actionButtons = (window["isOasEnabled"] || window["isServiceMapEnabled"]) &&
|
||||
<div style={{ display: 'flex', height: "100%" }}>
|
||||
{window["isOasEnabled"] && <Button
|
||||
startIcon={<img className="custom" src={services} alt="services"></img>}
|
||||
size="large"
|
||||
variant="contained"
|
||||
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
|
||||
style={{ marginRight: 25, textTransform: 'unset' }}
|
||||
onClick={handleOpenOasModal}>
|
||||
OpenAPI Specs
|
||||
</Button>}
|
||||
{window["isServiceMapEnabled"] && <Button
|
||||
startIcon={<img src={serviceMap} className="custom" alt="service-map" style={{marginRight:"8%"}}></img>}
|
||||
size="large"
|
||||
variant="contained"
|
||||
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
|
||||
onClick={openServiceMapModalDebounce}
|
||||
style={{textTransform: 'unset'}}>
|
||||
Service Map
|
||||
</Button>}
|
||||
</div>
|
||||
|
||||
useEffect(() => {
|
||||
return () => {
|
||||
//closeSocket()
|
||||
}
|
||||
},[])
|
||||
<div style={{ display: 'flex', height: "100%" }}>
|
||||
{window["isOasEnabled"] && <Button
|
||||
startIcon={<img className="custom" src={services} alt="services"></img>}
|
||||
size="large"
|
||||
variant="contained"
|
||||
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
|
||||
style={{ marginRight: 25, textTransform: 'unset' }}
|
||||
onClick={handleOpenOasModal}>
|
||||
Service Catalog
|
||||
</Button>}
|
||||
{window["isServiceMapEnabled"] && <Button
|
||||
startIcon={<img src={serviceMap} className="custom" alt="service-map" style={{ marginRight: "8%" }}></img>}
|
||||
size="large"
|
||||
variant="contained"
|
||||
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
|
||||
onClick={openServiceMapModalDebounce}
|
||||
style={{ textTransform: 'unset' }}>
|
||||
Service Map
|
||||
</Button>}
|
||||
</div>
|
||||
|
||||
return (
|
||||
<>
|
||||
<>
|
||||
<TrafficViewer setAnalyzeStatus={setAnalyzeStatus} webSocketUrl={MizuWebsocketURL} isCloseWebSocket={!openWebSocket}
|
||||
trafficViewerApiProp={trafficViewerApi} actionButtons={actionButtons} isShowStatusBar={!openOasModal} isDemoBannerView={false}/>
|
||||
</>
|
||||
trafficViewerApiProp={trafficViewerApi} actionButtons={actionButtons} isShowStatusBar={!(openOasModal || serviceMapModalOpen)} isDemoBannerView={false} />
|
||||
</>
|
||||
);
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user