mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-05-08 10:17:49 +00:00
* Basenine MongoDB mess
* Fix more
* Fix the `mongodb` container arguments
* Add Basenine ARM64 binary
* Make the changes related to `leftOff` becoming a string value
* Make `leftOffTop` state string
* Handle `CloseConnection` in `Fetch`
* Upgrade Basenine to `0.7.0`
* Revert the changes in `package.json` and `package-lock.json`
* Fix the `Dockerfile`
* Remove the binaries
* Increase the Basenine up deadline to 20 seconds
* Revert the changes in `shared/kubernetes/provider.go`
* Fix the OAS generator tests
* Protect from race condition
* Fix mutexes
* Fix unlock
* Fix logging data types
* Try to stabilize the tests
* Remove the `replace` statement
* revert the change the done in 2899414f2b to not change the leftOff
* Change `leftOffBottom` empty string default value to `latest`
* Upgrade Basenine to `0.7.1`
* Handle the Basenine client library errors better
* Use `DEFAULT_QUERY` constant
* Remove `min=-1`
* Replace some `Errorf`s with `Panicf`s
* Remove the closure in `runGenerator` method
* Remove an unnecessary check
Co-authored-by: M. Mert Yildiran <mehmet@up9.com>
Co-authored-by: Andrey Pokhilko <apc4@ya.ru>
Co-authored-by: undera <undera@undera-old-desktop.home>
Co-authored-by: AmitUp9 <96980485+AmitUp9@users.noreply.github.com>
95 lines
2.3 KiB
Go
95 lines
2.3 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
|
|
basenine "github.com/up9inc/basenine/client/go"
|
|
"github.com/up9inc/mizu/agent/pkg/dependency"
|
|
"github.com/up9inc/mizu/shared"
|
|
"github.com/up9inc/mizu/shared/logger"
|
|
tapApi "github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
type EntryStreamer interface {
|
|
Get(ctx context.Context, socketId int, params *WebSocketParams) error
|
|
}
|
|
|
|
type BasenineEntryStreamer struct{}
|
|
|
|
func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *WebSocketParams) error {
|
|
var connection *basenine.Connection
|
|
|
|
entryStreamerSocketConnector := dependency.GetInstance(dependency.EntryStreamerSocketConnector).(EntryStreamerSocketConnector)
|
|
|
|
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
|
if err != nil {
|
|
logger.Log.Errorf("failed to establish a connection to Basenine: %v", err)
|
|
entryStreamerSocketConnector.CleanupSocket(socketId)
|
|
return err
|
|
}
|
|
|
|
data := make(chan []byte)
|
|
meta := make(chan []byte)
|
|
|
|
query := params.Query
|
|
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
|
|
if err != nil {
|
|
entryStreamerSocketConnector.SendToastError(socketId, err)
|
|
}
|
|
|
|
handleDataChannel := func(c *basenine.Connection, data chan []byte) {
|
|
for {
|
|
bytes := <-data
|
|
|
|
if string(bytes) == basenine.CloseChannel {
|
|
return
|
|
}
|
|
|
|
var entry *tapApi.Entry
|
|
err = json.Unmarshal(bytes, &entry)
|
|
if err != nil {
|
|
logger.Log.Debugf("Error unmarshalling entry: %v", err.Error())
|
|
continue
|
|
}
|
|
|
|
entryStreamerSocketConnector.SendEntry(socketId, entry, params)
|
|
}
|
|
}
|
|
|
|
handleMetaChannel := func(c *basenine.Connection, meta chan []byte) {
|
|
for {
|
|
bytes := <-meta
|
|
|
|
if string(bytes) == basenine.CloseChannel {
|
|
return
|
|
}
|
|
|
|
var metadata *basenine.Metadata
|
|
err = json.Unmarshal(bytes, &metadata)
|
|
if err != nil {
|
|
logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error())
|
|
continue
|
|
}
|
|
|
|
entryStreamerSocketConnector.SendMetadata(socketId, metadata)
|
|
}
|
|
}
|
|
|
|
go handleDataChannel(connection, data)
|
|
go handleMetaChannel(connection, meta)
|
|
|
|
if err = connection.Query(query, data, meta); err != nil {
|
|
logger.Log.Panicf("Query mode call failed: %v", err)
|
|
}
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
data <- []byte(basenine.CloseChannel)
|
|
meta <- []byte(basenine.CloseChannel)
|
|
connection.Close()
|
|
}()
|
|
|
|
return nil
|
|
}
|