mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-16 19:10:17 +00:00
Compare commits
5 Commits
33.0-dev24
...
33.0-dev28
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
433253a27b | ||
|
|
00cc94fbe5 | ||
|
|
8feef78ab1 | ||
|
|
992abc99bc | ||
|
|
486d0b1088 |
@@ -210,7 +210,7 @@ func runInHarReaderMode() {
|
||||
func enableExpFeatureIfNeeded() {
|
||||
if config.Config.OAS {
|
||||
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator)
|
||||
oasGenerator.Start(nil)
|
||||
oasGenerator.Start()
|
||||
}
|
||||
if config.Config.ServiceMap {
|
||||
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap)
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/up9inc/mizu/agent/pkg/holder"
|
||||
"github.com/up9inc/mizu/agent/pkg/providers"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/oas"
|
||||
"github.com/up9inc/mizu/agent/pkg/servicemap"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/resolver"
|
||||
@@ -152,6 +153,9 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
|
||||
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
|
||||
serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
|
||||
|
||||
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGeneratorSink)
|
||||
oasGenerator.HandleEntry(mizuEntry)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,17 +1,12 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
"net"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||
"github.com/up9inc/mizu/agent/pkg/oas"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||
"github.com/up9inc/mizu/agent/pkg/oas"
|
||||
)
|
||||
|
||||
func TestGetOASServers(t *testing.T) {
|
||||
@@ -37,33 +32,14 @@ func TestGetOASSpec(t *testing.T) {
|
||||
t.Logf("Written body: %s", recorder.Body.String())
|
||||
}
|
||||
|
||||
type fakeConn struct {
|
||||
sendBuffer *bytes.Buffer
|
||||
receiveBuffer *bytes.Buffer
|
||||
}
|
||||
|
||||
func (f fakeConn) Read(p []byte) (int, error) { return f.sendBuffer.Read(p) }
|
||||
func (f fakeConn) Write(p []byte) (int, error) { return f.receiveBuffer.Write(p) }
|
||||
func (fakeConn) Close() error { return nil }
|
||||
func (fakeConn) LocalAddr() net.Addr { return nil }
|
||||
func (fakeConn) RemoteAddr() net.Addr { return nil }
|
||||
func (fakeConn) SetDeadline(t time.Time) error { return nil }
|
||||
func (fakeConn) SetReadDeadline(t time.Time) error { return nil }
|
||||
func (fakeConn) SetWriteDeadline(t time.Time) error { return nil }
|
||||
|
||||
func getRecorderAndContext() (*httptest.ResponseRecorder, *gin.Context) {
|
||||
dummyConn := new(basenine.Connection)
|
||||
dummyConn.Conn = fakeConn{
|
||||
sendBuffer: bytes.NewBufferString("\n"),
|
||||
receiveBuffer: bytes.NewBufferString("\n"),
|
||||
}
|
||||
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} {
|
||||
return oas.GetDefaultOasGeneratorInstance()
|
||||
})
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(recorder)
|
||||
oas.GetDefaultOasGeneratorInstance().Start(dummyConn)
|
||||
oas.GetDefaultOasGeneratorInstance().Start()
|
||||
oas.GetDefaultOasGeneratorInstance().GetServiceSpecs().Store("some", oas.NewGen("some"))
|
||||
return recorder, c
|
||||
}
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
package oas
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/url"
|
||||
"sync"
|
||||
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
"github.com/up9inc/mizu/agent/pkg/har"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
|
||||
"github.com/up9inc/mizu/logger"
|
||||
@@ -19,22 +16,20 @@ var (
|
||||
instance *defaultOasGenerator
|
||||
)
|
||||
|
||||
type OasGeneratorSink interface {
|
||||
HandleEntry(mizuEntry *api.Entry)
|
||||
}
|
||||
|
||||
type OasGenerator interface {
|
||||
Start(conn *basenine.Connection)
|
||||
Start()
|
||||
Stop()
|
||||
IsStarted() bool
|
||||
GetServiceSpecs() *sync.Map
|
||||
SetEntriesQuery(query string) bool
|
||||
}
|
||||
|
||||
type defaultOasGenerator struct {
|
||||
started bool
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
serviceSpecs *sync.Map
|
||||
dbConn *basenine.Connection
|
||||
dbMutex sync.Mutex
|
||||
entriesQuery string
|
||||
}
|
||||
|
||||
func GetDefaultOasGeneratorInstance() *defaultOasGenerator {
|
||||
@@ -45,102 +40,29 @@ func GetDefaultOasGeneratorInstance() *defaultOasGenerator {
|
||||
return instance
|
||||
}
|
||||
|
||||
func (g *defaultOasGenerator) Start(conn *basenine.Connection) {
|
||||
if g.started {
|
||||
return
|
||||
}
|
||||
|
||||
if g.dbConn == nil {
|
||||
if conn == nil {
|
||||
logger.Log.Infof("Creating new DB connection for OAS generator to address %s:%s", shared.BasenineHost, shared.BaseninePort)
|
||||
newConn, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
||||
if err != nil {
|
||||
logger.Log.Error("Error connecting to DB for OAS generator, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
conn = newConn
|
||||
}
|
||||
|
||||
g.dbConn = conn
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
g.cancel = cancel
|
||||
g.ctx = ctx
|
||||
g.serviceSpecs = &sync.Map{}
|
||||
|
||||
func (g *defaultOasGenerator) Start() {
|
||||
g.started = true
|
||||
|
||||
go g.runGenerator()
|
||||
}
|
||||
|
||||
func (g *defaultOasGenerator) Stop() {
|
||||
if !g.started {
|
||||
return
|
||||
}
|
||||
|
||||
g.started = false
|
||||
|
||||
g.cancel()
|
||||
g.reset()
|
||||
|
||||
g.dbMutex.Lock()
|
||||
defer g.dbMutex.Unlock()
|
||||
if g.dbConn != nil {
|
||||
g.dbConn.Close()
|
||||
g.dbConn = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (g *defaultOasGenerator) IsStarted() bool {
|
||||
return g.started
|
||||
}
|
||||
|
||||
func (g *defaultOasGenerator) runGenerator() {
|
||||
// Make []byte channels to receive the data and the meta
|
||||
dataChan := make(chan []byte)
|
||||
metaChan := make(chan []byte)
|
||||
|
||||
g.dbMutex.Lock()
|
||||
defer g.dbMutex.Unlock()
|
||||
logger.Log.Infof("Querying DB for OAS generator with query '%s'", g.entriesQuery)
|
||||
if err := g.dbConn.Query("latest", g.entriesQuery, dataChan, metaChan); err != nil {
|
||||
logger.Log.Errorf("Query mode call failed: %v", err)
|
||||
func (g *defaultOasGenerator) HandleEntry(mizuEntry *api.Entry) {
|
||||
if !g.started {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-g.ctx.Done():
|
||||
logger.Log.Infof("OAS Generator was canceled")
|
||||
close(dataChan)
|
||||
close(metaChan)
|
||||
return
|
||||
|
||||
case metaBytes, ok := <-metaChan:
|
||||
if !ok {
|
||||
logger.Log.Infof("OAS Generator - meta channel closed")
|
||||
break
|
||||
}
|
||||
logger.Log.Debugf("Meta: %s", metaBytes)
|
||||
|
||||
case dataBytes, ok := <-dataChan:
|
||||
if !ok {
|
||||
logger.Log.Infof("OAS Generator - entries channel closed")
|
||||
break
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Data: %s", dataBytes)
|
||||
e := new(api.Entry)
|
||||
err := json.Unmarshal(dataBytes, e)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
g.handleEntry(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (g *defaultOasGenerator) handleEntry(mizuEntry *api.Entry) {
|
||||
if mizuEntry.Protocol.Name == "http" {
|
||||
dest := mizuEntry.Destination.Name
|
||||
if dest == "" {
|
||||
@@ -210,18 +132,9 @@ func (g *defaultOasGenerator) GetServiceSpecs() *sync.Map {
|
||||
return g.serviceSpecs
|
||||
}
|
||||
|
||||
func (g *defaultOasGenerator) SetEntriesQuery(query string) bool {
|
||||
changed := g.entriesQuery != query
|
||||
g.entriesQuery = query
|
||||
return changed
|
||||
}
|
||||
|
||||
func NewDefaultOasGenerator() *defaultOasGenerator {
|
||||
return &defaultOasGenerator{
|
||||
started: false,
|
||||
ctx: nil,
|
||||
cancel: nil,
|
||||
serviceSpecs: nil,
|
||||
dbConn: nil,
|
||||
serviceSpecs: &sync.Map{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
)
|
||||
|
||||
func TestOASGen(t *testing.T) {
|
||||
gen := new(defaultOasGenerator)
|
||||
gen := GetDefaultOasGeneratorInstance()
|
||||
|
||||
e := new(har.Entry)
|
||||
err := json.Unmarshal([]byte(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`), e)
|
||||
@@ -21,8 +21,7 @@ func TestOASGen(t *testing.T) {
|
||||
Entry: *e,
|
||||
}
|
||||
|
||||
dummyConn := GetFakeDBConn(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`)
|
||||
gen.Start(dummyConn)
|
||||
gen.Start()
|
||||
gen.handleHARWithSource(ews)
|
||||
g, ok := gen.serviceSpecs.Load("some")
|
||||
if !ok {
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
package oas
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
@@ -13,22 +11,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/chanced/openapi"
|
||||
"github.com/up9inc/mizu/agent/pkg/har"
|
||||
"github.com/up9inc/mizu/logger"
|
||||
"github.com/wI2L/jsondiff"
|
||||
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
"github.com/up9inc/mizu/agent/pkg/har"
|
||||
)
|
||||
|
||||
func GetFakeDBConn(send string) *basenine.Connection {
|
||||
dummyConn := new(basenine.Connection)
|
||||
dummyConn.Conn = FakeConn{
|
||||
sendBuffer: bytes.NewBufferString(send),
|
||||
receiveBuffer: bytes.NewBufferString(""),
|
||||
}
|
||||
return dummyConn
|
||||
}
|
||||
|
||||
// if started via env, write file into subdir
|
||||
func outputSpec(label string, spec *openapi.OpenAPI, t *testing.T) string {
|
||||
content, err := json.MarshalIndent(spec, "", " ")
|
||||
@@ -278,17 +265,3 @@ func TestLoadValid3_1(t *testing.T) {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
type FakeConn struct {
|
||||
sendBuffer *bytes.Buffer
|
||||
receiveBuffer *bytes.Buffer
|
||||
}
|
||||
|
||||
func (f FakeConn) Read(p []byte) (int, error) { return f.sendBuffer.Read(p) }
|
||||
func (f FakeConn) Write(p []byte) (int, error) { return f.receiveBuffer.Write(p) }
|
||||
func (FakeConn) Close() error { return nil }
|
||||
func (FakeConn) LocalAddr() net.Addr { return nil }
|
||||
func (FakeConn) RemoteAddr() net.Addr { return nil }
|
||||
func (FakeConn) SetDeadline(t time.Time) error { return nil }
|
||||
func (FakeConn) SetReadDeadline(t time.Time) error { return nil }
|
||||
func (FakeConn) SetWriteDeadline(t time.Time) error { return nil }
|
||||
|
||||
@@ -22,7 +22,14 @@ function run_single_bench() {
|
||||
for ((i=0;i<"$MIZU_BENCHMARK_RUN_COUNT";i++)); do
|
||||
log " $i: Running tapper"
|
||||
rm -f tapper.log
|
||||
nohup ./agent/build/mizuagent --tap --api-server-address ws://localhost:8899/wsTapper -i lo -stats 10 > tapper.log 2>&1 &
|
||||
tapper_args=("--tap" "--api-server-address" "ws://localhost:8899/wsTapper" "-stats" "10" "-ignore-ports" "8899,9099")
|
||||
if [[ $(uname) == "Darwin" ]]
|
||||
then
|
||||
tapper_args+=("-i" "lo0" "-"decoder "Loopback")
|
||||
else
|
||||
tapper_args+=("-i" "lo")
|
||||
fi
|
||||
nohup ./agent/build/mizuagent ${tapper_args[@]} > tapper.log 2>&1 &
|
||||
|
||||
log " $i: Running client (hey)"
|
||||
hey -z $MIZU_BENCHMARK_CLIENT_PERIOD -c $MIZU_BENCHMARK_CLIENTS_COUNT -q $MIZU_BENCHMARK_QPS $MIZU_BENCHMARK_URL > /dev/null || return 1
|
||||
@@ -50,6 +57,7 @@ log "Writing output to $MIZU_BENCHMARK_OUTPUT_DIR"
|
||||
cd $MIZU_HOME || exit 1
|
||||
|
||||
export HOST_MODE=0
|
||||
export SENSITIVE_DATA_FILTERING_OPTIONS='{"DisableRedaction": true}'
|
||||
export MIZU_DEBUG_DISABLE_PCAP=false
|
||||
export MIZU_DEBUG_DISABLE_TCP_REASSEMBLY=false
|
||||
export MIZU_DEBUG_DISABLE_TCP_STREAM=false
|
||||
|
||||
@@ -50,10 +50,9 @@
|
||||
td
|
||||
color: $light-gray
|
||||
padding: 10px
|
||||
font-size: 11px
|
||||
font-size: 15px
|
||||
font-weight: 600
|
||||
padding-top: 5px
|
||||
padding-bottom: 5px
|
||||
padding: 10px
|
||||
|
||||
.nowrap
|
||||
white-space: nowrap
|
||||
|
||||
Reference in New Issue
Block a user