mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-04-07 03:07:25 +00:00
Compare commits
5 Commits
33.0-dev8
...
33.0-dev12
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
74bd4b180f | ||
|
|
8ea2dabb34 | ||
|
|
366d34b8d0 | ||
|
|
5fc3e38c1a | ||
|
|
09a0fca2c2 |
@@ -6,7 +6,6 @@ require (
|
|||||||
github.com/antelman107/net-wait-go v0.0.0-20210623112055-cf684aebda7b
|
github.com/antelman107/net-wait-go v0.0.0-20210623112055-cf684aebda7b
|
||||||
github.com/chanced/openapi v0.0.8
|
github.com/chanced/openapi v0.0.8
|
||||||
github.com/djherbis/atime v1.1.0
|
github.com/djherbis/atime v1.1.0
|
||||||
github.com/elastic/go-elasticsearch/v7 v7.17.0
|
|
||||||
github.com/getkin/kin-openapi v0.89.0
|
github.com/getkin/kin-openapi v0.89.0
|
||||||
github.com/gin-contrib/static v0.0.1
|
github.com/gin-contrib/static v0.0.1
|
||||||
github.com/gin-gonic/gin v1.7.7
|
github.com/gin-gonic/gin v1.7.7
|
||||||
|
|||||||
@@ -166,8 +166,6 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD
|
|||||||
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
|
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
|
||||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
|
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
|
||||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||||
github.com/elastic/go-elasticsearch/v7 v7.17.0 h1:0fcSh4qeC/i1+7QU1KXpmq2iUAdMk4l0/vmbtW1+KJM=
|
|
||||||
github.com/elastic/go-elasticsearch/v7 v7.17.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
|
|
||||||
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
|
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
|
||||||
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
||||||
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
|
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
|
||||||
|
|||||||
@@ -17,7 +17,6 @@ import (
|
|||||||
"github.com/gin-contrib/static"
|
"github.com/gin-contrib/static"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||||
"github.com/up9inc/mizu/agent/pkg/elastic"
|
|
||||||
"github.com/up9inc/mizu/agent/pkg/entries"
|
"github.com/up9inc/mizu/agent/pkg/entries"
|
||||||
"github.com/up9inc/mizu/agent/pkg/middlewares"
|
"github.com/up9inc/mizu/agent/pkg/middlewares"
|
||||||
"github.com/up9inc/mizu/agent/pkg/models"
|
"github.com/up9inc/mizu/agent/pkg/models"
|
||||||
@@ -205,7 +204,6 @@ func enableExpFeatureIfNeeded() {
|
|||||||
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap)
|
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap)
|
||||||
serviceMapGenerator.Enable()
|
serviceMapGenerator.Enable()
|
||||||
}
|
}
|
||||||
elastic.GetInstance().Configure(config.Config.Elastic)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
|
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
|
||||||
@@ -373,6 +371,7 @@ func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) {
|
|||||||
func initializeDependencies() {
|
func initializeDependencies() {
|
||||||
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
|
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
|
||||||
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
|
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
|
||||||
|
dependency.RegisterGenerator(dependency.EntriesInserter, func() interface{} { return api.GetBasenineEntryInserterInstance() })
|
||||||
dependency.RegisterGenerator(dependency.EntriesProvider, func() interface{} { return &entries.BasenineEntriesProvider{} })
|
dependency.RegisterGenerator(dependency.EntriesProvider, func() interface{} { return &entries.BasenineEntriesProvider{} })
|
||||||
dependency.RegisterGenerator(dependency.EntriesSocketStreamer, func() interface{} { return &api.BasenineEntryStreamer{} })
|
dependency.RegisterGenerator(dependency.EntriesSocketStreamer, func() interface{} { return &api.BasenineEntryStreamer{} })
|
||||||
dependency.RegisterGenerator(dependency.EntryStreamerSocketConnector, func() interface{} { return &api.DefaultEntryStreamerSocketConnector{} })
|
dependency.RegisterGenerator(dependency.EntryStreamerSocketConnector, func() interface{} { return &api.DefaultEntryStreamerSocketConnector{} })
|
||||||
|
|||||||
@@ -5,20 +5,19 @@ import (
|
|||||||
|
|
||||||
basenine "github.com/up9inc/basenine/client/go"
|
basenine "github.com/up9inc/basenine/client/go"
|
||||||
"github.com/up9inc/mizu/agent/pkg/models"
|
"github.com/up9inc/mizu/agent/pkg/models"
|
||||||
"github.com/up9inc/mizu/logger"
|
|
||||||
tapApi "github.com/up9inc/mizu/tap/api"
|
tapApi "github.com/up9inc/mizu/tap/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EntryStreamerSocketConnector interface {
|
type EntryStreamerSocketConnector interface {
|
||||||
SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams)
|
SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) error
|
||||||
SendMetadata(socketId int, metadata *basenine.Metadata)
|
SendMetadata(socketId int, metadata *basenine.Metadata) error
|
||||||
SendToastError(socketId int, err error)
|
SendToastError(socketId int, err error) error
|
||||||
CleanupSocket(socketId int)
|
CleanupSocket(socketId int)
|
||||||
}
|
}
|
||||||
|
|
||||||
type DefaultEntryStreamerSocketConnector struct{}
|
type DefaultEntryStreamerSocketConnector struct{}
|
||||||
|
|
||||||
func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) {
|
func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) error {
|
||||||
var message []byte
|
var message []byte
|
||||||
if params.EnableFullEntries {
|
if params.EnableFullEntries {
|
||||||
message, _ = models.CreateFullEntryWebSocketMessage(entry)
|
message, _ = models.CreateFullEntryWebSocketMessage(entry)
|
||||||
@@ -29,26 +28,32 @@ func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tap
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := SendToSocket(socketId, message); err != nil {
|
if err := SendToSocket(socketId, message); err != nil {
|
||||||
logger.Log.Error(err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) {
|
func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) error {
|
||||||
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
|
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
|
||||||
if err := SendToSocket(socketId, metadataBytes); err != nil {
|
if err := SendToSocket(socketId, metadataBytes); err != nil {
|
||||||
logger.Log.Error(err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) {
|
func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) error {
|
||||||
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
|
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
|
||||||
Type: "error",
|
Type: "error",
|
||||||
AutoClose: 5000,
|
AutoClose: 5000,
|
||||||
Text: fmt.Sprintf("Syntax error: %s", err.Error()),
|
Text: fmt.Sprintf("Syntax error: %s", err.Error()),
|
||||||
})
|
})
|
||||||
if err := SendToSocket(socketId, toastBytes); err != nil {
|
if err := SendToSocket(socketId, toastBytes); err != nil {
|
||||||
logger.Log.Error(err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) {
|
func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) {
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ import (
|
|||||||
"github.com/up9inc/mizu/agent/pkg/models"
|
"github.com/up9inc/mizu/agent/pkg/models"
|
||||||
|
|
||||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||||
"github.com/up9inc/mizu/agent/pkg/elastic"
|
|
||||||
"github.com/up9inc/mizu/agent/pkg/har"
|
"github.com/up9inc/mizu/agent/pkg/har"
|
||||||
"github.com/up9inc/mizu/agent/pkg/holder"
|
"github.com/up9inc/mizu/agent/pkg/holder"
|
||||||
"github.com/up9inc/mizu/agent/pkg/providers"
|
"github.com/up9inc/mizu/agent/pkg/providers"
|
||||||
@@ -25,10 +24,7 @@ import (
|
|||||||
"github.com/up9inc/mizu/agent/pkg/utils"
|
"github.com/up9inc/mizu/agent/pkg/utils"
|
||||||
|
|
||||||
"github.com/up9inc/mizu/logger"
|
"github.com/up9inc/mizu/logger"
|
||||||
"github.com/up9inc/mizu/shared"
|
|
||||||
tapApi "github.com/up9inc/mizu/tap/api"
|
tapApi "github.com/up9inc/mizu/tap/api"
|
||||||
|
|
||||||
basenine "github.com/up9inc/basenine/client/go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var k8sResolver *resolver.Resolver
|
var k8sResolver *resolver.Resolver
|
||||||
@@ -103,20 +99,6 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
|||||||
panic("Channel of captured messages is nil")
|
panic("Channel of captured messages is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
BasenineReconnect:
|
|
||||||
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
|
|
||||||
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
|
||||||
goto BasenineReconnect
|
|
||||||
}
|
|
||||||
if err = connection.InsertMode(); err != nil {
|
|
||||||
logger.Log.Errorf("Insert mode call failed: %v", err)
|
|
||||||
connection.Close()
|
|
||||||
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
|
||||||
goto BasenineReconnect
|
|
||||||
}
|
|
||||||
|
|
||||||
disableOASValidation := false
|
disableOASValidation := false
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
doc, contractContent, router, err := loadOAS(ctx)
|
doc, contractContent, router, err := loadOAS(ctx)
|
||||||
@@ -163,17 +145,13 @@ BasenineReconnect:
|
|||||||
|
|
||||||
providers.EntryAdded(len(data))
|
providers.EntryAdded(len(data))
|
||||||
|
|
||||||
if err = connection.SendText(string(data)); err != nil {
|
entryInserter := dependency.GetInstance(dependency.EntriesInserter).(EntryInserter)
|
||||||
logger.Log.Errorf("An error occured while inserting a new record to database: %v", err)
|
if err := entryInserter.Insert(mizuEntry); err != nil {
|
||||||
connection.Close()
|
logger.Log.Errorf("Error inserting entry, err: %v", err)
|
||||||
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
|
||||||
goto BasenineReconnect
|
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
|
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
|
||||||
serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
|
serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
|
||||||
|
|
||||||
elastic.GetInstance().PushEntry(mizuEntry)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
71
agent/pkg/api/socket_data_inserter.go
Normal file
71
agent/pkg/api/socket_data_inserter.go
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
basenine "github.com/up9inc/basenine/client/go"
|
||||||
|
"github.com/up9inc/mizu/logger"
|
||||||
|
"github.com/up9inc/mizu/shared"
|
||||||
|
"github.com/up9inc/mizu/tap/api"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EntryInserter interface {
|
||||||
|
Insert(entry *api.Entry) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type BasenineEntryInserter struct {
|
||||||
|
connection *basenine.Connection
|
||||||
|
}
|
||||||
|
|
||||||
|
var instance *BasenineEntryInserter
|
||||||
|
var once sync.Once
|
||||||
|
|
||||||
|
func GetBasenineEntryInserterInstance() *BasenineEntryInserter {
|
||||||
|
once.Do(func() {
|
||||||
|
instance = &BasenineEntryInserter{}
|
||||||
|
})
|
||||||
|
|
||||||
|
return instance
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *BasenineEntryInserter) Insert(entry *api.Entry) error {
|
||||||
|
if e.connection == nil {
|
||||||
|
e.connection = initializeConnection()
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := json.Marshal(entry)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error marshling entry, err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.connection.SendText(string(data)); err != nil {
|
||||||
|
e.connection.Close()
|
||||||
|
e.connection = nil
|
||||||
|
|
||||||
|
return fmt.Errorf("error sending text to database, err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func initializeConnection() *basenine.Connection{
|
||||||
|
for {
|
||||||
|
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
||||||
|
if err != nil {
|
||||||
|
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
|
||||||
|
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = connection.InsertMode(); err != nil {
|
||||||
|
logger.Log.Errorf("Insert mode call failed: %v", err)
|
||||||
|
connection.Close()
|
||||||
|
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return connection
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -34,14 +34,18 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
|
|||||||
meta := make(chan []byte)
|
meta := make(chan []byte)
|
||||||
|
|
||||||
query := params.Query
|
query := params.Query
|
||||||
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
|
if err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query); err != nil {
|
||||||
if err != nil {
|
if err := entryStreamerSocketConnector.SendToastError(socketId, err); err != nil {
|
||||||
entryStreamerSocketConnector.SendToastError(socketId, err)
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
entryStreamerSocketConnector.CleanupSocket(socketId)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
leftOff, err := e.fetch(socketId, params, entryStreamerSocketConnector)
|
leftOff, err := e.fetch(socketId, params, entryStreamerSocketConnector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Log.Errorf("Fetch error: %v", err.Error())
|
logger.Log.Errorf("Fetch error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
handleDataChannel := func(c *basenine.Connection, data chan []byte) {
|
handleDataChannel := func(c *basenine.Connection, data chan []byte) {
|
||||||
@@ -53,13 +57,15 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
|
|||||||
}
|
}
|
||||||
|
|
||||||
var entry *tapApi.Entry
|
var entry *tapApi.Entry
|
||||||
err = json.Unmarshal(bytes, &entry)
|
if err = json.Unmarshal(bytes, &entry); err != nil {
|
||||||
if err != nil {
|
logger.Log.Debugf("Error unmarshalling entry: %v", err)
|
||||||
logger.Log.Debugf("Error unmarshalling entry: %v", err.Error())
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
entryStreamerSocketConnector.SendEntry(socketId, entry, params)
|
if err := entryStreamerSocketConnector.SendEntry(socketId, entry, params); err != nil {
|
||||||
|
logger.Log.Errorf("Error sending entry to socket, err: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,13 +78,15 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
|
|||||||
}
|
}
|
||||||
|
|
||||||
var metadata *basenine.Metadata
|
var metadata *basenine.Metadata
|
||||||
err = json.Unmarshal(bytes, &metadata)
|
if err = json.Unmarshal(bytes, &metadata); err != nil {
|
||||||
if err != nil {
|
logger.Log.Debugf("Error unmarshalling metadata: %v", err)
|
||||||
logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error())
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
entryStreamerSocketConnector.SendMetadata(socketId, metadata)
|
if err := entryStreamerSocketConnector.SendMetadata(socketId, metadata); err != nil {
|
||||||
|
logger.Log.Errorf("Error sending metadata to socket, err: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,28 +133,31 @@ func (e *BasenineEntryStreamer) fetch(socketId int, params *WebSocketParams, con
|
|||||||
}
|
}
|
||||||
|
|
||||||
var firstMetadata *basenine.Metadata
|
var firstMetadata *basenine.Metadata
|
||||||
err = json.Unmarshal(firstMeta, &firstMetadata)
|
if err = json.Unmarshal(firstMeta, &firstMetadata); err != nil {
|
||||||
if err != nil {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
leftOff = firstMetadata.LeftOff
|
leftOff = firstMetadata.LeftOff
|
||||||
|
|
||||||
var lastMetadata *basenine.Metadata
|
var lastMetadata *basenine.Metadata
|
||||||
err = json.Unmarshal(lastMeta, &lastMetadata)
|
if err = json.Unmarshal(lastMeta, &lastMetadata); err != nil {
|
||||||
if err != nil {
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = connector.SendMetadata(socketId, lastMetadata); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
connector.SendMetadata(socketId, lastMetadata)
|
|
||||||
|
|
||||||
data = e.reverseBytesSlice(data)
|
data = e.reverseBytesSlice(data)
|
||||||
for _, row := range data {
|
for _, row := range data {
|
||||||
var entry *tapApi.Entry
|
var entry *tapApi.Entry
|
||||||
err = json.Unmarshal(row, &entry)
|
if err = json.Unmarshal(row, &entry); err != nil {
|
||||||
if err != nil {
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
connector.SendEntry(socketId, entry, params)
|
if err = connector.SendEntry(socketId, entry, params); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ type DependencyContainerType string
|
|||||||
const (
|
const (
|
||||||
ServiceMapGeneratorDependency = "ServiceMapGeneratorDependency"
|
ServiceMapGeneratorDependency = "ServiceMapGeneratorDependency"
|
||||||
OasGeneratorDependency = "OasGeneratorDependency"
|
OasGeneratorDependency = "OasGeneratorDependency"
|
||||||
|
EntriesInserter = "EntriesInserter"
|
||||||
EntriesProvider = "EntriesProvider"
|
EntriesProvider = "EntriesProvider"
|
||||||
EntriesSocketStreamer = "EntriesSocketStreamer"
|
EntriesSocketStreamer = "EntriesSocketStreamer"
|
||||||
EntryStreamerSocketConnector = "EntryStreamerSocketConnector"
|
EntryStreamerSocketConnector = "EntryStreamerSocketConnector"
|
||||||
|
|||||||
@@ -1,116 +0,0 @@
|
|||||||
package elastic
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"crypto/tls"
|
|
||||||
"encoding/json"
|
|
||||||
"net/http"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/elastic/go-elasticsearch/v7"
|
|
||||||
"github.com/up9inc/mizu/logger"
|
|
||||||
"github.com/up9inc/mizu/shared"
|
|
||||||
"github.com/up9inc/mizu/tap/api"
|
|
||||||
)
|
|
||||||
|
|
||||||
type client struct {
|
|
||||||
es *elasticsearch.Client
|
|
||||||
index string
|
|
||||||
insertedCount int
|
|
||||||
}
|
|
||||||
|
|
||||||
var instance *client
|
|
||||||
var once sync.Once
|
|
||||||
|
|
||||||
func GetInstance() *client {
|
|
||||||
once.Do(func() {
|
|
||||||
instance = newClient()
|
|
||||||
})
|
|
||||||
return instance
|
|
||||||
}
|
|
||||||
|
|
||||||
func (client *client) Configure(config shared.ElasticConfig) {
|
|
||||||
if config.Url == "" || config.User == "" || config.Password == "" {
|
|
||||||
if client.es != nil {
|
|
||||||
client.es = nil
|
|
||||||
}
|
|
||||||
logger.Log.Infof("No elastic configuration was supplied, elastic exporter disabled")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
transport := http.DefaultTransport
|
|
||||||
tlsClientConfig := &tls.Config{InsecureSkipVerify: true}
|
|
||||||
transport.(*http.Transport).TLSClientConfig = tlsClientConfig
|
|
||||||
cfg := elasticsearch.Config{
|
|
||||||
Addresses: []string{config.Url},
|
|
||||||
Username: config.User,
|
|
||||||
Password: config.Password,
|
|
||||||
Transport: transport,
|
|
||||||
}
|
|
||||||
|
|
||||||
es, err := elasticsearch.NewClient(cfg)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log.Errorf("Failed to initialize elastic client %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Have the client instance return a response
|
|
||||||
res, err := es.Info()
|
|
||||||
if err != nil {
|
|
||||||
logger.Log.Errorf("Elastic client.Info() ERROR: %v", err)
|
|
||||||
} else {
|
|
||||||
client.es = es
|
|
||||||
client.index = "mizu_traffic_http_" + time.Now().Format("2006_01_02_15_04")
|
|
||||||
client.insertedCount = 0
|
|
||||||
logger.Log.Infof("Elastic client configured, index: %s, cluster info: %v", client.index, res)
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func newClient() *client {
|
|
||||||
return &client{
|
|
||||||
es: nil,
|
|
||||||
index: "",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type httpEntry struct {
|
|
||||||
Source *api.TCP `json:"src"`
|
|
||||||
Destination *api.TCP `json:"dst"`
|
|
||||||
Outgoing bool `json:"outgoing"`
|
|
||||||
CreatedAt time.Time `json:"createdAt"`
|
|
||||||
Request map[string]interface{} `json:"request"`
|
|
||||||
Response map[string]interface{} `json:"response"`
|
|
||||||
ElapsedTime int64 `json:"elapsedTime"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (client *client) PushEntry(entry *api.Entry) {
|
|
||||||
if client.es == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if entry.Protocol.Name != "http" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
entryToPush := httpEntry{
|
|
||||||
Source: entry.Source,
|
|
||||||
Destination: entry.Destination,
|
|
||||||
Outgoing: entry.Outgoing,
|
|
||||||
CreatedAt: entry.StartTime,
|
|
||||||
Request: entry.Request,
|
|
||||||
Response: entry.Response,
|
|
||||||
ElapsedTime: entry.ElapsedTime,
|
|
||||||
}
|
|
||||||
|
|
||||||
entryJson, err := json.Marshal(entryToPush)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log.Errorf("json.Marshal ERROR: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var buffer bytes.Buffer
|
|
||||||
buffer.WriteString(string(entryJson))
|
|
||||||
res, _ := client.es.Index(client.index, &buffer)
|
|
||||||
if res.StatusCode == 201 {
|
|
||||||
client.insertedCount += 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -164,7 +164,6 @@ func getTapMizuAgentConfig() *shared.MizuAgentConfig {
|
|||||||
ServiceMap: config.Config.ServiceMap,
|
ServiceMap: config.Config.ServiceMap,
|
||||||
OAS: config.Config.OAS,
|
OAS: config.Config.OAS,
|
||||||
Telemetry: config.Config.Telemetry,
|
Telemetry: config.Config.Telemetry,
|
||||||
Elastic: config.Config.Elastic,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &mizuAgentConfig
|
return &mizuAgentConfig
|
||||||
|
|||||||
@@ -41,7 +41,6 @@ type ConfigStruct struct {
|
|||||||
LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""`
|
LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""`
|
||||||
ServiceMap bool `yaml:"service-map" default:"true"`
|
ServiceMap bool `yaml:"service-map" default:"true"`
|
||||||
OAS bool `yaml:"oas" default:"true"`
|
OAS bool `yaml:"oas" default:"true"`
|
||||||
Elastic shared.ElasticConfig `yaml:"elastic"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (config *ConfigStruct) validate() error {
|
func (config *ConfigStruct) validate() error {
|
||||||
|
|||||||
@@ -45,13 +45,6 @@ type MizuAgentConfig struct {
|
|||||||
ServiceMap bool `json:"serviceMap"`
|
ServiceMap bool `json:"serviceMap"`
|
||||||
OAS bool `json:"oas"`
|
OAS bool `json:"oas"`
|
||||||
Telemetry bool `json:"telemetry"`
|
Telemetry bool `json:"telemetry"`
|
||||||
Elastic ElasticConfig `json:"elastic"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ElasticConfig struct {
|
|
||||||
User string `yaml:"user,omitempty" default:"" readonly:""`
|
|
||||||
Password string `yaml:"password,omitempty" default:"" readonly:""`
|
|
||||||
Url string `yaml:"url,omitempty" default:"" readonly:""`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type WebSocketMessageMetadata struct {
|
type WebSocketMessageMetadata struct {
|
||||||
|
|||||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
|||||||
|
|
||||||
test-pull-expect:
|
test-pull-expect:
|
||||||
@mkdir -p expect
|
@mkdir -p expect
|
||||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect8/kafka/\* expect
|
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect9/kafka/\* expect
|
||||||
|
|||||||
@@ -3,13 +3,14 @@ package kafka
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"golang.org/x/text/cases"
|
|
||||||
"golang.org/x/text/language"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"golang.org/x/text/cases"
|
||||||
|
"golang.org/x/text/language"
|
||||||
|
|
||||||
"github.com/fatih/camelcase"
|
"github.com/fatih/camelcase"
|
||||||
"github.com/ohler55/ojg/jp"
|
"github.com/ohler55/ojg/jp"
|
||||||
"github.com/ohler55/ojg/oj"
|
"github.com/ohler55/ojg/oj"
|
||||||
@@ -36,9 +37,14 @@ type KafkaWrapper struct {
|
|||||||
|
|
||||||
func representRequestHeader(data map[string]interface{}, rep []interface{}) []interface{} {
|
func representRequestHeader(data map[string]interface{}, rep []interface{}) []interface{} {
|
||||||
requestHeader, _ := json.Marshal([]api.TableData{
|
requestHeader, _ := json.Marshal([]api.TableData{
|
||||||
|
{
|
||||||
|
Name: "ApiKeyName",
|
||||||
|
Value: data["apiKeyName"].(string),
|
||||||
|
Selector: `request.apiKeyName`,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Name: "ApiKey",
|
Name: "ApiKey",
|
||||||
Value: apiNames[int(data["apiKey"].(float64))],
|
Value: int(data["apiKey"].(float64)),
|
||||||
Selector: `request.apiKey`,
|
Selector: `request.apiKey`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -96,8 +96,8 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
|||||||
statusQuery := ""
|
statusQuery := ""
|
||||||
|
|
||||||
apiKey := ApiKey(entry.Request["apiKey"].(float64))
|
apiKey := ApiKey(entry.Request["apiKey"].(float64))
|
||||||
method := apiNames[apiKey]
|
method := entry.Request["apiKeyName"].(string)
|
||||||
methodQuery := fmt.Sprintf("request.apiKey == %d", int(entry.Request["apiKey"].(float64)))
|
methodQuery := fmt.Sprintf(`request.apiKeyName == "%s"`, method)
|
||||||
|
|
||||||
summary := ""
|
summary := ""
|
||||||
summaryQuery := ""
|
summaryQuery := ""
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
|
|
||||||
type Request struct {
|
type Request struct {
|
||||||
Size int32 `json:"size"`
|
Size int32 `json:"size"`
|
||||||
|
ApiKeyName string `json:"apiKeyName"`
|
||||||
ApiKey ApiKey `json:"apiKey"`
|
ApiKey ApiKey `json:"apiKey"`
|
||||||
ApiVersion int16 `json:"apiVersion"`
|
ApiVersion int16 `json:"apiVersion"`
|
||||||
CorrelationID int32 `json:"correlationID"`
|
CorrelationID int32 `json:"correlationID"`
|
||||||
@@ -202,6 +203,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, ca
|
|||||||
|
|
||||||
request := &Request{
|
request := &Request{
|
||||||
Size: size,
|
Size: size,
|
||||||
|
ApiKeyName: apiNames[apiKey],
|
||||||
ApiKey: apiKey,
|
ApiKey: apiKey,
|
||||||
ApiVersion: apiVersion,
|
ApiVersion: apiVersion,
|
||||||
CorrelationID: correlationID,
|
CorrelationID: correlationID,
|
||||||
|
|||||||
@@ -67,7 +67,6 @@
|
|||||||
height: 100%
|
height: 100%
|
||||||
display: flex
|
display: flex
|
||||||
flex-direction: column
|
flex-direction: column
|
||||||
margin-right: 10px
|
|
||||||
width: 100%
|
width: 100%
|
||||||
border-radius: 4px
|
border-radius: 4px
|
||||||
|
|
||||||
@@ -82,17 +81,18 @@
|
|||||||
margin-top: 10px
|
margin-top: 10px
|
||||||
margin-right: 10px
|
margin-right: 10px
|
||||||
|
|
||||||
.protocolsFilterList, .servicesFilter
|
.card
|
||||||
background: white
|
background: white
|
||||||
padding: 10px
|
padding: 10px
|
||||||
border-radius: 4px
|
border-radius: 4px
|
||||||
user-select: none
|
user-select: none
|
||||||
|
box-shadow: 0px 1px 5px #979797
|
||||||
|
|
||||||
.servicesFilter
|
.servicesFilterWrapper
|
||||||
margin-top: 10px
|
margin-top: 20px
|
||||||
|
margin-bottom: 3px
|
||||||
height: 100%
|
height: 100%
|
||||||
overflow: hidden
|
overflow: hidden
|
||||||
border-radius: 4px
|
|
||||||
|
|
||||||
& .servicesFilterList
|
.servicesFilterList
|
||||||
height: calc(100% - 30px - 52px)
|
height: calc(100% - 30px - 52px)
|
||||||
|
|||||||
@@ -158,6 +158,7 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onClos
|
|||||||
if (checkedProtocols.length === 0) {
|
if (checkedProtocols.length === 0) {
|
||||||
setCheckedProtocols(getProtocolsForFilter.map(x => x.key))
|
setCheckedProtocols(getProtocolsForFilter.map(x => x.key))
|
||||||
}
|
}
|
||||||
|
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||||
}, [getProtocolsForFilter])
|
}, [getProtocolsForFilter])
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
@@ -217,13 +218,13 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onClos
|
|||||||
<div className={styles.filterSection + ` ${isFilterClicked ? styles.show : ""}`}>
|
<div className={styles.filterSection + ` ${isFilterClicked ? styles.show : ""}`}>
|
||||||
<Resizeable minWidth={170} maxWidth={320}>
|
<Resizeable minWidth={170} maxWidth={320}>
|
||||||
<div className={styles.filterWrapper}>
|
<div className={styles.filterWrapper}>
|
||||||
<div className={styles.protocolsFilterList}>
|
<div className={styles.card}>
|
||||||
<SelectList items={getProtocolsForFilter} checkBoxWidth="5%" tableName={"PROTOCOLS"} multiSelect={true}
|
<SelectList items={getProtocolsForFilter} checkBoxWidth="5%" tableName={"PROTOCOLS"} multiSelect={true}
|
||||||
checkedValues={checkedProtocols} setCheckedValues={onProtocolsChange} tableClassName={styles.filters}
|
checkedValues={checkedProtocols} setCheckedValues={onProtocolsChange} tableClassName={styles.filters}
|
||||||
inputSearchClass={styles.servicesFilterSearch} isFilterable={false}/>
|
inputSearchClass={styles.servicesFilterSearch} isFilterable={false}/>
|
||||||
</div>
|
</div>
|
||||||
<div className={styles.servicesFilter}>
|
<div className={styles.servicesFilterWrapper + ` ${styles.card}`}>
|
||||||
<div className={styles.servicesFilterList}>
|
<div className={styles.servicesFilterList}>
|
||||||
<SelectList items={getServicesForFilter} tableName={"SERVICES"} tableClassName={styles.filters} multiSelect={true}
|
<SelectList items={getServicesForFilter} tableName={"SERVICES"} tableClassName={styles.filters} multiSelect={true}
|
||||||
checkBoxWidth="5%" checkedValues={checkedServices} setCheckedValues={onServiceChanges} inputSearchClass={styles.servicesFilterSearch}/>
|
checkBoxWidth="5%" checkedValues={checkedServices} setCheckedValues={onServiceChanges} inputSearchClass={styles.servicesFilterSearch}/>
|
||||||
</div>
|
</div>
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ const SelectList: React.FC<Props> = ({ items, tableName, checkedValues = [], mul
|
|||||||
|
|
||||||
const tableBody = filteredValues.length === 0 ?
|
const tableBody = filteredValues.length === 0 ?
|
||||||
<tr>
|
<tr>
|
||||||
<td colSpan={2}>
|
<td colSpan={2} className={styles.displayBlock}>
|
||||||
<NoDataMessage messageText={noItemsMessage} />
|
<NoDataMessage messageText={noItemsMessage} />
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
|
|||||||
@@ -1,44 +1,70 @@
|
|||||||
@import '../../../variables.module'
|
@import '../../../variables.module'
|
||||||
@import '../../../components'
|
@import '../../../components'
|
||||||
|
|
||||||
|
|
||||||
.selectListTable
|
.selectListTable
|
||||||
overflow: auto
|
|
||||||
height: 100%
|
|
||||||
user-select: none // when resizble moved we get unwanted beheviour
|
user-select: none // when resizble moved we get unwanted beheviour
|
||||||
|
height: 100%
|
||||||
|
|
||||||
table
|
table
|
||||||
width: 100%
|
width: 100%
|
||||||
margin-top: 20px
|
|
||||||
border-collapse: collapse
|
border-collapse: collapse
|
||||||
|
table-layout: fixed
|
||||||
|
height: 100%
|
||||||
|
display: flex
|
||||||
|
flex-flow: column
|
||||||
|
height: 100%
|
||||||
|
|
||||||
th
|
thead
|
||||||
color: $blue-gray
|
display: table
|
||||||
text-align: left
|
table-layout: fixed
|
||||||
padding: 10px
|
flex: 0 0 auto
|
||||||
position: sticky
|
width: calc(100% - 0.9em)
|
||||||
top: 0
|
|
||||||
background: $main-background-color
|
|
||||||
font-size: 12px
|
|
||||||
|
|
||||||
tr
|
tbody
|
||||||
border-bottom-width: 1px
|
display: block
|
||||||
border-bottom-color: $data-background-color
|
overflow: auto
|
||||||
border-bottom-style: solid
|
width: 100%
|
||||||
width: 100%
|
height: 100%
|
||||||
|
flex: 1 1 auto
|
||||||
|
|
||||||
td
|
tbody tr:hover
|
||||||
color: $light-gray
|
background: $header-background-color
|
||||||
padding: 10px
|
|
||||||
font-size: 11px
|
th
|
||||||
font-weight: 600
|
color: $blue-gray
|
||||||
padding-top: 5px
|
text-align: left
|
||||||
padding-bottom: 5px
|
padding: 10px
|
||||||
|
background: $main-background-color
|
||||||
|
font-size: 12px
|
||||||
|
|
||||||
|
tr
|
||||||
|
border-bottom-width: 1px
|
||||||
|
border-bottom-color: $data-background-color
|
||||||
|
border-bottom-style: solid
|
||||||
|
width: 100%
|
||||||
|
display: block
|
||||||
|
position: relative
|
||||||
|
display: table
|
||||||
|
table-layout: fixed
|
||||||
|
|
||||||
|
td
|
||||||
|
color: $light-gray
|
||||||
|
padding: 10px
|
||||||
|
font-size: 11px
|
||||||
|
font-weight: 600
|
||||||
|
padding-top: 5px
|
||||||
|
padding-bottom: 5px
|
||||||
|
|
||||||
.nowrap
|
.nowrap
|
||||||
white-space: nowrap
|
white-space: nowrap
|
||||||
|
|
||||||
.totalSelected
|
.totalSelected
|
||||||
font-size: 12px
|
font-size: 12px
|
||||||
color: $light-blue-color
|
color: $light-blue-color
|
||||||
font-weight: 700
|
font-weight: 700
|
||||||
|
|
||||||
|
.displayBlock
|
||||||
|
display: block
|
||||||
|
|
||||||
|
.filterInput
|
||||||
|
margin-bottom: 20px
|
||||||
|
|||||||
Reference in New Issue
Block a user