mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-15 02:19:54 +00:00
Compare commits
10 Commits
33.0-dev9
...
33.0-dev18
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
948af518b5 | ||
|
|
73448b514e | ||
|
|
fc194354bc | ||
|
|
8418802c7e | ||
|
|
bfa834e840 | ||
|
|
5c012641a5 | ||
|
|
74bd4b180f | ||
|
|
8ea2dabb34 | ||
|
|
366d34b8d0 | ||
|
|
5fc3e38c1a |
@@ -193,58 +193,56 @@ function checkFilter(filterDetails) {
|
||||
const entriesForDeeperCheck = 5;
|
||||
|
||||
it(`checking the filter: ${filter}`, function () {
|
||||
waitForFetch50AndPause();
|
||||
cy.get('.w-tc-editor-text').clear();
|
||||
// applying the filter with alt+enter or with the button
|
||||
cy.get('.w-tc-editor-text').type(`${filter}${applyByCtrlEnter ? '{ctrl+enter}' : ''}`);
|
||||
cy.get('.w-tc-editor').should('have.attr', 'style').and('include', Cypress.env('greenFilterColor'));
|
||||
if (!applyByCtrlEnter)
|
||||
cy.get('[type="submit"]').click();
|
||||
|
||||
cy.get('#total-entries').should('not.have.text', '0').then(number => {
|
||||
const totalEntries = number.text();
|
||||
waitForFetch();
|
||||
pauseStream();
|
||||
|
||||
cy.get(`#list [id^=entry]`).last().then(elem => {
|
||||
const element = elem[0];
|
||||
const entryId = getEntryId(element.id);
|
||||
// checks the hover on the last entry (the only one in DOM at the beginning)
|
||||
leftOnHoverCheck(entryId, leftSidePath, filter);
|
||||
cy.get(`#list [id^=entry]`).last().then(elem => {
|
||||
const element = elem[0];
|
||||
const entryId = getEntryId(element.id);
|
||||
|
||||
cy.get('.w-tc-editor-text').clear();
|
||||
// applying the filter with alt+enter or with the button
|
||||
cy.get('.w-tc-editor-text').type(`${filter}${applyByCtrlEnter ? '{ctrl+enter}' : ''}`);
|
||||
cy.get('.w-tc-editor').should('have.attr', 'style').and('include', Cypress.env('greenFilterColor'));
|
||||
if (!applyByCtrlEnter)
|
||||
cy.get('[type="submit"]').click();
|
||||
// only one entry in DOM after filtering, checking all checks on it
|
||||
leftTextCheck(entryId, leftSidePath, leftSideExpectedText);
|
||||
leftOnHoverCheck(entryId, leftSidePath, filter);
|
||||
|
||||
waitForFetch50AndPause();
|
||||
rightTextCheck(rightSidePath, rightSideExpectedText);
|
||||
rightOnHoverCheck(rightSidePath, filter);
|
||||
checkRightSideResponseBody();
|
||||
});
|
||||
|
||||
// only one entry in DOM after filtering, checking all checks on it
|
||||
leftTextCheck(entryId, leftSidePath, leftSideExpectedText);
|
||||
leftOnHoverCheck(entryId, leftSidePath, filter);
|
||||
resizeToHugeMizu();
|
||||
|
||||
rightTextCheck(rightSidePath, rightSideExpectedText);
|
||||
rightOnHoverCheck(rightSidePath, filter);
|
||||
checkRightSideResponseBody();
|
||||
});
|
||||
// checking only 'leftTextCheck' on all entries because the rest of the checks require more time
|
||||
cy.get(`#list [id^=entry]`).each(elem => {
|
||||
const element = elem[0];
|
||||
let entryId = getEntryId(element.id);
|
||||
leftTextCheck(entryId, leftSidePath, leftSideExpectedText);
|
||||
});
|
||||
|
||||
resizeToHugeMizu();
|
||||
// making the other 3 checks on the first X entries (longer time for each check)
|
||||
deeperCheck(leftSidePath, rightSidePath, filter, rightSideExpectedText, entriesForDeeperCheck);
|
||||
|
||||
// checking only 'leftTextCheck' on all entries because the rest of the checks require more time
|
||||
cy.get(`#list [id^=entry]`).each(elem => {
|
||||
const element = elem[0];
|
||||
let entryId = getEntryId(element.id);
|
||||
leftTextCheck(entryId, leftSidePath, leftSideExpectedText);
|
||||
});
|
||||
|
||||
// making the other 3 checks on the first X entries (longer time for each check)
|
||||
deeperCheck(leftSidePath, rightSidePath, filter, rightSideExpectedText, entriesForDeeperCheck);
|
||||
|
||||
// reloading then waiting for the entries number to load
|
||||
resizeToNormalMizu();
|
||||
cy.reload();
|
||||
cy.get('#total-entries', {timeout: refreshWaitTimeout}).should('have.text', totalEntries);
|
||||
})
|
||||
// reloading then waiting for the entries number to load
|
||||
resizeToNormalMizu();
|
||||
cy.reload();
|
||||
waitForFetch();
|
||||
pauseStream();
|
||||
});
|
||||
}
|
||||
|
||||
function waitForFetch50AndPause() {
|
||||
// wait half a second and pause the stream to preserve the DOM
|
||||
cy.wait(500);
|
||||
function waitForFetch() {
|
||||
cy.get('#entries-length', {timeout: refreshWaitTimeout}).should((el) => {
|
||||
expect(parseInt(el.text().trim(), 10)).to.be.greaterThan(20);
|
||||
});
|
||||
}
|
||||
|
||||
function pauseStream() {
|
||||
cy.get('#pause-icon').click();
|
||||
cy.get('#pause-icon').should('not.be.visible');
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ require (
|
||||
github.com/antelman107/net-wait-go v0.0.0-20210623112055-cf684aebda7b
|
||||
github.com/chanced/openapi v0.0.8
|
||||
github.com/djherbis/atime v1.1.0
|
||||
github.com/elastic/go-elasticsearch/v7 v7.17.0
|
||||
github.com/getkin/kin-openapi v0.89.0
|
||||
github.com/gin-contrib/static v0.0.1
|
||||
github.com/gin-gonic/gin v1.7.7
|
||||
|
||||
@@ -166,8 +166,6 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD
|
||||
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||
github.com/elastic/go-elasticsearch/v7 v7.17.0 h1:0fcSh4qeC/i1+7QU1KXpmq2iUAdMk4l0/vmbtW1+KJM=
|
||||
github.com/elastic/go-elasticsearch/v7 v7.17.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
|
||||
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
|
||||
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
||||
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
|
||||
|
||||
@@ -17,7 +17,6 @@ import (
|
||||
"github.com/gin-contrib/static"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||
"github.com/up9inc/mizu/agent/pkg/elastic"
|
||||
"github.com/up9inc/mizu/agent/pkg/entries"
|
||||
"github.com/up9inc/mizu/agent/pkg/middlewares"
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
@@ -205,7 +204,6 @@ func enableExpFeatureIfNeeded() {
|
||||
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap)
|
||||
serviceMapGenerator.Enable()
|
||||
}
|
||||
elastic.GetInstance().Configure(config.Config.Elastic)
|
||||
}
|
||||
|
||||
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
|
||||
|
||||
@@ -5,20 +5,19 @@ import (
|
||||
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
"github.com/up9inc/mizu/logger"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
type EntryStreamerSocketConnector interface {
|
||||
SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams)
|
||||
SendMetadata(socketId int, metadata *basenine.Metadata)
|
||||
SendToastError(socketId int, err error)
|
||||
SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) error
|
||||
SendMetadata(socketId int, metadata *basenine.Metadata) error
|
||||
SendToastError(socketId int, err error) error
|
||||
CleanupSocket(socketId int)
|
||||
}
|
||||
|
||||
type DefaultEntryStreamerSocketConnector struct{}
|
||||
|
||||
func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) {
|
||||
func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) error {
|
||||
var message []byte
|
||||
if params.EnableFullEntries {
|
||||
message, _ = models.CreateFullEntryWebSocketMessage(entry)
|
||||
@@ -29,26 +28,32 @@ func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tap
|
||||
}
|
||||
|
||||
if err := SendToSocket(socketId, message); err != nil {
|
||||
logger.Log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) {
|
||||
func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) error {
|
||||
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
|
||||
if err := SendToSocket(socketId, metadataBytes); err != nil {
|
||||
logger.Log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) {
|
||||
func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) error {
|
||||
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
|
||||
Type: "error",
|
||||
AutoClose: 5000,
|
||||
Text: fmt.Sprintf("Syntax error: %s", err.Error()),
|
||||
})
|
||||
if err := SendToSocket(socketId, toastBytes); err != nil {
|
||||
logger.Log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) {
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||
"github.com/up9inc/mizu/agent/pkg/elastic"
|
||||
"github.com/up9inc/mizu/agent/pkg/har"
|
||||
"github.com/up9inc/mizu/agent/pkg/holder"
|
||||
"github.com/up9inc/mizu/agent/pkg/providers"
|
||||
@@ -153,8 +152,6 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
|
||||
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
|
||||
serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
|
||||
|
||||
elastic.GetInstance().PushEntry(mizuEntry)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -34,14 +34,18 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
|
||||
meta := make(chan []byte)
|
||||
|
||||
query := params.Query
|
||||
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
|
||||
if err != nil {
|
||||
entryStreamerSocketConnector.SendToastError(socketId, err)
|
||||
if err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query); err != nil {
|
||||
if err := entryStreamerSocketConnector.SendToastError(socketId, err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
entryStreamerSocketConnector.CleanupSocket(socketId)
|
||||
return err
|
||||
}
|
||||
|
||||
leftOff, err := e.fetch(socketId, params, entryStreamerSocketConnector)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Fetch error: %v", err.Error())
|
||||
logger.Log.Errorf("Fetch error: %v", err)
|
||||
}
|
||||
|
||||
handleDataChannel := func(c *basenine.Connection, data chan []byte) {
|
||||
@@ -53,13 +57,15 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
|
||||
}
|
||||
|
||||
var entry *tapApi.Entry
|
||||
err = json.Unmarshal(bytes, &entry)
|
||||
if err != nil {
|
||||
logger.Log.Debugf("Error unmarshalling entry: %v", err.Error())
|
||||
if err = json.Unmarshal(bytes, &entry); err != nil {
|
||||
logger.Log.Debugf("Error unmarshalling entry: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
entryStreamerSocketConnector.SendEntry(socketId, entry, params)
|
||||
if err := entryStreamerSocketConnector.SendEntry(socketId, entry, params); err != nil {
|
||||
logger.Log.Errorf("Error sending entry to socket, err: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,13 +78,15 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
|
||||
}
|
||||
|
||||
var metadata *basenine.Metadata
|
||||
err = json.Unmarshal(bytes, &metadata)
|
||||
if err != nil {
|
||||
logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error())
|
||||
if err = json.Unmarshal(bytes, &metadata); err != nil {
|
||||
logger.Log.Debugf("Error unmarshalling metadata: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
entryStreamerSocketConnector.SendMetadata(socketId, metadata)
|
||||
if err := entryStreamerSocketConnector.SendMetadata(socketId, metadata); err != nil {
|
||||
logger.Log.Errorf("Error sending metadata to socket, err: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,28 +133,31 @@ func (e *BasenineEntryStreamer) fetch(socketId int, params *WebSocketParams, con
|
||||
}
|
||||
|
||||
var firstMetadata *basenine.Metadata
|
||||
err = json.Unmarshal(firstMeta, &firstMetadata)
|
||||
if err != nil {
|
||||
if err = json.Unmarshal(firstMeta, &firstMetadata); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
leftOff = firstMetadata.LeftOff
|
||||
|
||||
var lastMetadata *basenine.Metadata
|
||||
err = json.Unmarshal(lastMeta, &lastMetadata)
|
||||
if err != nil {
|
||||
if err = json.Unmarshal(lastMeta, &lastMetadata); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err = connector.SendMetadata(socketId, lastMetadata); err != nil {
|
||||
return
|
||||
}
|
||||
connector.SendMetadata(socketId, lastMetadata)
|
||||
|
||||
data = e.reverseBytesSlice(data)
|
||||
for _, row := range data {
|
||||
var entry *tapApi.Entry
|
||||
err = json.Unmarshal(row, &entry)
|
||||
if err != nil {
|
||||
if err = json.Unmarshal(row, &entry); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
connector.SendEntry(socketId, entry, params)
|
||||
if err = connector.SendEntry(socketId, entry, params); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1,116 +0,0 @@
|
||||
package elastic
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v7"
|
||||
"github.com/up9inc/mizu/logger"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
type client struct {
|
||||
es *elasticsearch.Client
|
||||
index string
|
||||
insertedCount int
|
||||
}
|
||||
|
||||
var instance *client
|
||||
var once sync.Once
|
||||
|
||||
func GetInstance() *client {
|
||||
once.Do(func() {
|
||||
instance = newClient()
|
||||
})
|
||||
return instance
|
||||
}
|
||||
|
||||
func (client *client) Configure(config shared.ElasticConfig) {
|
||||
if config.Url == "" || config.User == "" || config.Password == "" {
|
||||
if client.es != nil {
|
||||
client.es = nil
|
||||
}
|
||||
logger.Log.Infof("No elastic configuration was supplied, elastic exporter disabled")
|
||||
return
|
||||
}
|
||||
transport := http.DefaultTransport
|
||||
tlsClientConfig := &tls.Config{InsecureSkipVerify: true}
|
||||
transport.(*http.Transport).TLSClientConfig = tlsClientConfig
|
||||
cfg := elasticsearch.Config{
|
||||
Addresses: []string{config.Url},
|
||||
Username: config.User,
|
||||
Password: config.Password,
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
es, err := elasticsearch.NewClient(cfg)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Failed to initialize elastic client %v", err)
|
||||
}
|
||||
|
||||
// Have the client instance return a response
|
||||
res, err := es.Info()
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Elastic client.Info() ERROR: %v", err)
|
||||
} else {
|
||||
client.es = es
|
||||
client.index = "mizu_traffic_http_" + time.Now().Format("2006_01_02_15_04")
|
||||
client.insertedCount = 0
|
||||
logger.Log.Infof("Elastic client configured, index: %s, cluster info: %v", client.index, res)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
}
|
||||
|
||||
func newClient() *client {
|
||||
return &client{
|
||||
es: nil,
|
||||
index: "",
|
||||
}
|
||||
}
|
||||
|
||||
type httpEntry struct {
|
||||
Source *api.TCP `json:"src"`
|
||||
Destination *api.TCP `json:"dst"`
|
||||
Outgoing bool `json:"outgoing"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
Request map[string]interface{} `json:"request"`
|
||||
Response map[string]interface{} `json:"response"`
|
||||
ElapsedTime int64 `json:"elapsedTime"`
|
||||
}
|
||||
|
||||
func (client *client) PushEntry(entry *api.Entry) {
|
||||
if client.es == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if entry.Protocol.Name != "http" {
|
||||
return
|
||||
}
|
||||
|
||||
entryToPush := httpEntry{
|
||||
Source: entry.Source,
|
||||
Destination: entry.Destination,
|
||||
Outgoing: entry.Outgoing,
|
||||
CreatedAt: entry.StartTime,
|
||||
Request: entry.Request,
|
||||
Response: entry.Response,
|
||||
ElapsedTime: entry.ElapsedTime,
|
||||
}
|
||||
|
||||
entryJson, err := json.Marshal(entryToPush)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("json.Marshal ERROR: %v", err)
|
||||
return
|
||||
}
|
||||
var buffer bytes.Buffer
|
||||
buffer.WriteString(string(entryJson))
|
||||
res, _ := client.es.Index(client.index, &buffer)
|
||||
if res.StatusCode == 201 {
|
||||
client.insertedCount += 1
|
||||
}
|
||||
}
|
||||
@@ -164,7 +164,6 @@ func getTapMizuAgentConfig() *shared.MizuAgentConfig {
|
||||
ServiceMap: config.Config.ServiceMap,
|
||||
OAS: config.Config.OAS,
|
||||
Telemetry: config.Config.Telemetry,
|
||||
Elastic: config.Config.Elastic,
|
||||
}
|
||||
|
||||
return &mizuAgentConfig
|
||||
|
||||
@@ -41,7 +41,6 @@ type ConfigStruct struct {
|
||||
LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""`
|
||||
ServiceMap bool `yaml:"service-map" default:"true"`
|
||||
OAS bool `yaml:"oas" default:"true"`
|
||||
Elastic shared.ElasticConfig `yaml:"elastic"`
|
||||
}
|
||||
|
||||
func (config *ConfigStruct) validate() error {
|
||||
|
||||
@@ -45,13 +45,6 @@ type MizuAgentConfig struct {
|
||||
ServiceMap bool `json:"serviceMap"`
|
||||
OAS bool `json:"oas"`
|
||||
Telemetry bool `json:"telemetry"`
|
||||
Elastic ElasticConfig `json:"elastic"`
|
||||
}
|
||||
|
||||
type ElasticConfig struct {
|
||||
User string `yaml:"user,omitempty" default:"" readonly:""`
|
||||
Password string `yaml:"password,omitempty" default:"" readonly:""`
|
||||
Url string `yaml:"url,omitempty" default:"" readonly:""`
|
||||
}
|
||||
|
||||
type WebSocketMessageMetadata struct {
|
||||
|
||||
@@ -104,11 +104,6 @@ type OutputChannelItem struct {
|
||||
Namespace string
|
||||
}
|
||||
|
||||
type ProtoIdentifier struct {
|
||||
Protocol *Protocol
|
||||
IsClosedOthers bool
|
||||
}
|
||||
|
||||
type ReadProgress struct {
|
||||
readBytes int
|
||||
lastCurrent int
|
||||
@@ -123,6 +118,11 @@ func (p *ReadProgress) Current() (n int) {
|
||||
return p.lastCurrent
|
||||
}
|
||||
|
||||
func (p *ReadProgress) Reset() {
|
||||
p.readBytes = 0
|
||||
p.lastCurrent = 0
|
||||
}
|
||||
|
||||
type Dissector interface {
|
||||
Register(*Extension)
|
||||
Ping()
|
||||
@@ -419,13 +419,12 @@ type TcpReader interface {
|
||||
GetCaptureTime() time.Time
|
||||
GetEmitter() Emitter
|
||||
GetIsClosed() bool
|
||||
GetExtension() *Extension
|
||||
}
|
||||
|
||||
type TcpStream interface {
|
||||
SetProtocol(protocol *Protocol)
|
||||
GetOrigin() Capture
|
||||
GetProtoIdentifier() *ProtoIdentifier
|
||||
GetProtocol() *Protocol
|
||||
GetReqResMatchers() []RequestResponseMatcher
|
||||
GetIsTapTarget() bool
|
||||
GetIsClosed() bool
|
||||
|
||||
@@ -75,14 +75,14 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
||||
var lastMethodFrameMessage Message
|
||||
|
||||
for {
|
||||
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &protocol {
|
||||
if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &protocol {
|
||||
return errors.New("Identified by another protocol")
|
||||
}
|
||||
|
||||
frame, err := r.ReadFrame()
|
||||
if err == io.EOF {
|
||||
// We must read until we see an EOF... very important!
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
switch f := frame.(type) {
|
||||
|
||||
@@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
|
||||
func (reader *tcpReader) GetIsClosed() bool {
|
||||
return reader.isClosed
|
||||
}
|
||||
|
||||
func (reader *tcpReader) GetExtension() *api.Extension {
|
||||
return reader.extension
|
||||
}
|
||||
|
||||
@@ -7,18 +7,17 @@ import (
|
||||
)
|
||||
|
||||
type tcpStream struct {
|
||||
isClosed bool
|
||||
protoIdentifier *api.ProtoIdentifier
|
||||
isTapTarget bool
|
||||
origin api.Capture
|
||||
reqResMatchers []api.RequestResponseMatcher
|
||||
isClosed bool
|
||||
protocol *api.Protocol
|
||||
isTapTarget bool
|
||||
origin api.Capture
|
||||
reqResMatchers []api.RequestResponseMatcher
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func NewTcpStream(capture api.Capture) api.TcpStream {
|
||||
return &tcpStream{
|
||||
origin: capture,
|
||||
protoIdentifier: &api.ProtoIdentifier{},
|
||||
origin: capture,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,8 +27,8 @@ func (t *tcpStream) GetOrigin() api.Capture {
|
||||
return t.origin
|
||||
}
|
||||
|
||||
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
|
||||
return t.protoIdentifier
|
||||
func (t *tcpStream) GetProtocol() *api.Protocol {
|
||||
return t.protocol
|
||||
}
|
||||
|
||||
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||
|
||||
@@ -144,7 +144,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
||||
http2Assembler = createHTTP2Assembler(b)
|
||||
}
|
||||
|
||||
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &http11protocol {
|
||||
if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &http11protocol {
|
||||
return errors.New("Identified by another protocol")
|
||||
}
|
||||
|
||||
@@ -200,7 +200,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
||||
}
|
||||
}
|
||||
|
||||
if reader.GetParent().GetProtoIdentifier().Protocol == nil {
|
||||
if reader.GetParent().GetProtocol() == nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
|
||||
func (reader *tcpReader) GetIsClosed() bool {
|
||||
return reader.isClosed
|
||||
}
|
||||
|
||||
func (reader *tcpReader) GetExtension() *api.Extension {
|
||||
return reader.extension
|
||||
}
|
||||
|
||||
@@ -7,18 +7,17 @@ import (
|
||||
)
|
||||
|
||||
type tcpStream struct {
|
||||
isClosed bool
|
||||
protoIdentifier *api.ProtoIdentifier
|
||||
isTapTarget bool
|
||||
origin api.Capture
|
||||
reqResMatchers []api.RequestResponseMatcher
|
||||
isClosed bool
|
||||
protocol *api.Protocol
|
||||
isTapTarget bool
|
||||
origin api.Capture
|
||||
reqResMatchers []api.RequestResponseMatcher
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func NewTcpStream(capture api.Capture) api.TcpStream {
|
||||
return &tcpStream{
|
||||
origin: capture,
|
||||
protoIdentifier: &api.ProtoIdentifier{},
|
||||
origin: capture,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,8 +27,8 @@ func (t *tcpStream) GetOrigin() api.Capture {
|
||||
return t.origin
|
||||
}
|
||||
|
||||
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
|
||||
return t.protoIdentifier
|
||||
func (t *tcpStream) GetProtocol() *api.Protocol {
|
||||
return t.protocol
|
||||
}
|
||||
|
||||
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect8/kafka/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect9/kafka/\* expect
|
||||
|
||||
@@ -3,13 +3,14 @@ package kafka
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"golang.org/x/text/cases"
|
||||
"golang.org/x/text/language"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/text/cases"
|
||||
"golang.org/x/text/language"
|
||||
|
||||
"github.com/fatih/camelcase"
|
||||
"github.com/ohler55/ojg/jp"
|
||||
"github.com/ohler55/ojg/oj"
|
||||
@@ -36,9 +37,14 @@ type KafkaWrapper struct {
|
||||
|
||||
func representRequestHeader(data map[string]interface{}, rep []interface{}) []interface{} {
|
||||
requestHeader, _ := json.Marshal([]api.TableData{
|
||||
{
|
||||
Name: "ApiKeyName",
|
||||
Value: data["apiKeyName"].(string),
|
||||
Selector: `request.apiKeyName`,
|
||||
},
|
||||
{
|
||||
Name: "ApiKey",
|
||||
Value: apiNames[int(data["apiKey"].(float64))],
|
||||
Value: int(data["apiKey"].(float64)),
|
||||
Selector: `request.apiKey`,
|
||||
},
|
||||
{
|
||||
|
||||
@@ -38,7 +38,7 @@ func (d dissecting) Ping() {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
|
||||
reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
|
||||
for {
|
||||
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &_protocol {
|
||||
if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &_protocol {
|
||||
return errors.New("Identified by another protocol")
|
||||
}
|
||||
|
||||
@@ -96,8 +96,8 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
statusQuery := ""
|
||||
|
||||
apiKey := ApiKey(entry.Request["apiKey"].(float64))
|
||||
method := apiNames[apiKey]
|
||||
methodQuery := fmt.Sprintf("request.apiKey == %d", int(entry.Request["apiKey"].(float64)))
|
||||
method := entry.Request["apiKeyName"].(string)
|
||||
methodQuery := fmt.Sprintf(`request.apiKeyName == "%s"`, method)
|
||||
|
||||
summary := ""
|
||||
summaryQuery := ""
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
type Request struct {
|
||||
Size int32 `json:"size"`
|
||||
ApiKeyName string `json:"apiKeyName"`
|
||||
ApiKey ApiKey `json:"apiKey"`
|
||||
ApiVersion int16 `json:"apiVersion"`
|
||||
CorrelationID int32 `json:"correlationID"`
|
||||
@@ -202,6 +203,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, ca
|
||||
|
||||
request := &Request{
|
||||
Size: size,
|
||||
ApiKeyName: apiNames[apiKey],
|
||||
ApiKey: apiKey,
|
||||
ApiVersion: apiVersion,
|
||||
CorrelationID: correlationID,
|
||||
|
||||
@@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
|
||||
func (reader *tcpReader) GetIsClosed() bool {
|
||||
return reader.isClosed
|
||||
}
|
||||
|
||||
func (reader *tcpReader) GetExtension() *api.Extension {
|
||||
return reader.extension
|
||||
}
|
||||
|
||||
@@ -7,18 +7,17 @@ import (
|
||||
)
|
||||
|
||||
type tcpStream struct {
|
||||
isClosed bool
|
||||
protoIdentifier *api.ProtoIdentifier
|
||||
isTapTarget bool
|
||||
origin api.Capture
|
||||
reqResMatchers []api.RequestResponseMatcher
|
||||
isClosed bool
|
||||
protocol *api.Protocol
|
||||
isTapTarget bool
|
||||
origin api.Capture
|
||||
reqResMatchers []api.RequestResponseMatcher
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func NewTcpStream(capture api.Capture) api.TcpStream {
|
||||
return &tcpStream{
|
||||
origin: capture,
|
||||
protoIdentifier: &api.ProtoIdentifier{},
|
||||
origin: capture,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,8 +27,8 @@ func (t *tcpStream) GetOrigin() api.Capture {
|
||||
return t.origin
|
||||
}
|
||||
|
||||
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
|
||||
return t.protoIdentifier
|
||||
func (t *tcpStream) GetProtocol() *api.Protocol {
|
||||
return t.protocol
|
||||
}
|
||||
|
||||
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||
|
||||
@@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
|
||||
func (reader *tcpReader) GetIsClosed() bool {
|
||||
return reader.isClosed
|
||||
}
|
||||
|
||||
func (reader *tcpReader) GetExtension() *api.Extension {
|
||||
return reader.extension
|
||||
}
|
||||
|
||||
@@ -7,18 +7,17 @@ import (
|
||||
)
|
||||
|
||||
type tcpStream struct {
|
||||
isClosed bool
|
||||
protoIdentifier *api.ProtoIdentifier
|
||||
isTapTarget bool
|
||||
origin api.Capture
|
||||
reqResMatchers []api.RequestResponseMatcher
|
||||
isClosed bool
|
||||
protocol *api.Protocol
|
||||
isTapTarget bool
|
||||
origin api.Capture
|
||||
reqResMatchers []api.RequestResponseMatcher
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func NewTcpStream(capture api.Capture) api.TcpStream {
|
||||
return &tcpStream{
|
||||
origin: capture,
|
||||
protoIdentifier: &api.ProtoIdentifier{},
|
||||
origin: capture,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,8 +27,8 @@ func (t *tcpStream) GetOrigin() api.Capture {
|
||||
return t.origin
|
||||
}
|
||||
|
||||
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
|
||||
return t.protoIdentifier
|
||||
func (t *tcpStream) GetProtocol() *api.Protocol {
|
||||
return t.protocol
|
||||
}
|
||||
|
||||
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||
|
||||
@@ -3,11 +3,9 @@ package tap
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
@@ -17,50 +15,48 @@ import (
|
||||
* Implements io.Reader interface (Read)
|
||||
*/
|
||||
type tcpReader struct {
|
||||
ident string
|
||||
tcpID *api.TcpID
|
||||
isClosed bool
|
||||
isClient bool
|
||||
isOutgoing bool
|
||||
msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload
|
||||
data []byte
|
||||
progress *api.ReadProgress
|
||||
captureTime time.Time
|
||||
parent *tcpStream
|
||||
packetsSeen uint
|
||||
extension *api.Extension
|
||||
emitter api.Emitter
|
||||
counterPair *api.CounterPair
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
ident string
|
||||
tcpID *api.TcpID
|
||||
isClosed bool
|
||||
isClient bool
|
||||
isOutgoing bool
|
||||
msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload
|
||||
msgBuffer []api.TcpReaderDataMsg
|
||||
msgBufferMaster []api.TcpReaderDataMsg
|
||||
data []byte
|
||||
progress *api.ReadProgress
|
||||
captureTime time.Time
|
||||
parent *tcpStream
|
||||
emitter api.Emitter
|
||||
counterPair *api.CounterPair
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent *tcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) *tcpReader {
|
||||
func NewTcpReader(ident string, tcpId *api.TcpID, parent *tcpStream, isClient bool, isOutgoing bool, emitter api.Emitter) *tcpReader {
|
||||
return &tcpReader{
|
||||
msgQueue: msgQueue,
|
||||
progress: progress,
|
||||
ident: ident,
|
||||
tcpID: tcpId,
|
||||
captureTime: captureTime,
|
||||
parent: parent,
|
||||
isClient: isClient,
|
||||
isOutgoing: isOutgoing,
|
||||
extension: extension,
|
||||
emitter: emitter,
|
||||
counterPair: counterPair,
|
||||
reqResMatcher: reqResMatcher,
|
||||
msgQueue: make(chan api.TcpReaderDataMsg),
|
||||
progress: &api.ReadProgress{},
|
||||
ident: ident,
|
||||
tcpID: tcpId,
|
||||
parent: parent,
|
||||
isClient: isClient,
|
||||
isOutgoing: isOutgoing,
|
||||
emitter: emitter,
|
||||
}
|
||||
}
|
||||
|
||||
func (reader *tcpReader) run(options *api.TrafficFilteringOptions, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
b := bufio.NewReader(reader)
|
||||
err := reader.extension.Dissector.Dissect(b, reader, options)
|
||||
if err != nil {
|
||||
_, err = io.Copy(ioutil.Discard, reader)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("%v", err)
|
||||
for i, extension := range extensions {
|
||||
reader.reqResMatcher = reader.parent.reqResMatchers[i]
|
||||
reader.counterPair = reader.parent.counterPairs[i]
|
||||
b := bufio.NewReader(reader)
|
||||
extension.Dissector.Dissect(b, reader, options) //nolint
|
||||
if reader.isProtocolIdentified() {
|
||||
break
|
||||
}
|
||||
reader.rewind()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,21 +77,60 @@ func (reader *tcpReader) sendMsgIfNotClosed(msg api.TcpReaderDataMsg) {
|
||||
reader.Unlock()
|
||||
}
|
||||
|
||||
func (reader *tcpReader) isProtocolIdentified() bool {
|
||||
return reader.parent.protocol != nil
|
||||
}
|
||||
|
||||
func (reader *tcpReader) rewind() {
|
||||
// Reset the data
|
||||
reader.data = make([]byte, 0)
|
||||
|
||||
// Reset msgBuffer from the master record
|
||||
reader.parent.Lock()
|
||||
reader.msgBuffer = make([]api.TcpReaderDataMsg, len(reader.msgBufferMaster))
|
||||
copy(reader.msgBuffer, reader.msgBufferMaster)
|
||||
reader.parent.Unlock()
|
||||
|
||||
// Reset the read progress
|
||||
reader.progress.Reset()
|
||||
}
|
||||
|
||||
func (reader *tcpReader) populateData(msg api.TcpReaderDataMsg) {
|
||||
reader.data = msg.GetBytes()
|
||||
reader.captureTime = msg.GetTimestamp()
|
||||
}
|
||||
|
||||
func (reader *tcpReader) Read(p []byte) (int, error) {
|
||||
var msg api.TcpReaderDataMsg
|
||||
|
||||
for len(reader.msgBuffer) > 0 && len(reader.data) == 0 {
|
||||
// Pop first message
|
||||
if len(reader.msgBuffer) > 1 {
|
||||
msg, reader.msgBuffer = reader.msgBuffer[0], reader.msgBuffer[1:]
|
||||
} else {
|
||||
msg = reader.msgBuffer[0]
|
||||
reader.msgBuffer = make([]api.TcpReaderDataMsg, 0)
|
||||
}
|
||||
|
||||
// Get the bytes
|
||||
reader.populateData(msg)
|
||||
}
|
||||
|
||||
ok := true
|
||||
for ok && len(reader.data) == 0 {
|
||||
msg, ok = <-reader.msgQueue
|
||||
if msg != nil {
|
||||
reader.data = msg.GetBytes()
|
||||
reader.captureTime = msg.GetTimestamp()
|
||||
}
|
||||
reader.populateData(msg)
|
||||
|
||||
if len(reader.data) > 0 {
|
||||
reader.packetsSeen += 1
|
||||
if !reader.isProtocolIdentified() {
|
||||
reader.msgBufferMaster = append(
|
||||
reader.msgBufferMaster,
|
||||
msg,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !ok || len(reader.data) == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
@@ -142,7 +177,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
|
||||
func (reader *tcpReader) GetIsClosed() bool {
|
||||
return reader.isClosed
|
||||
}
|
||||
|
||||
func (reader *tcpReader) GetExtension() *api.Extension {
|
||||
return reader.extension
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"github.com/google/gopacket"
|
||||
"github.com/google/gopacket/layers" // pulls in all layers decoders
|
||||
"github.com/google/gopacket/reassembly"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
"github.com/up9inc/mizu/tap/diagnose"
|
||||
)
|
||||
|
||||
@@ -16,10 +15,10 @@ type tcpReassemblyStream struct {
|
||||
fsmerr bool
|
||||
optchecker reassembly.TCPOptionCheck
|
||||
isDNS bool
|
||||
tcpStream api.TcpStream
|
||||
tcpStream *tcpStream
|
||||
}
|
||||
|
||||
func NewTcpReassemblyStream(ident string, tcp *layers.TCP, fsmOptions reassembly.TCPSimpleFSMOptions, stream api.TcpStream) reassembly.Stream {
|
||||
func NewTcpReassemblyStream(ident string, tcp *layers.TCP, fsmOptions reassembly.TCPSimpleFSMOptions, stream *tcpStream) reassembly.Stream {
|
||||
return &tcpReassemblyStream{
|
||||
ident: ident,
|
||||
tcpState: reassembly.NewTCPSimpleFSM(fsmOptions),
|
||||
@@ -139,17 +138,10 @@ func (t *tcpReassemblyStream) ReassembledSG(sg reassembly.ScatterGather, ac reas
|
||||
// This channel is read by an tcpReader object
|
||||
diagnose.AppStats.IncReassembledTcpPayloadsCount()
|
||||
timestamp := ac.GetCaptureInfo().Timestamp
|
||||
stream := t.tcpStream.(*tcpStream)
|
||||
if dir == reassembly.TCPDirClientToServer {
|
||||
for i := range stream.getClients() {
|
||||
reader := stream.getClient(i)
|
||||
reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
|
||||
}
|
||||
t.tcpStream.client.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
|
||||
} else {
|
||||
for i := range stream.getServers() {
|
||||
reader := stream.getServer(i)
|
||||
reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
|
||||
}
|
||||
t.tcpStream.server.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -157,7 +149,7 @@ func (t *tcpReassemblyStream) ReassembledSG(sg reassembly.ScatterGather, ac reas
|
||||
|
||||
func (t *tcpReassemblyStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
|
||||
if t.tcpStream.GetIsTapTarget() && !t.tcpStream.GetIsClosed() {
|
||||
t.tcpStream.(*tcpStream).close()
|
||||
t.tcpStream.close()
|
||||
}
|
||||
// do not remove the connection to allow last ACK
|
||||
return false
|
||||
|
||||
@@ -13,25 +13,26 @@ import (
|
||||
* In our implementation, we pass information from ReassembledSG to the TcpReader through a shared channel.
|
||||
*/
|
||||
type tcpStream struct {
|
||||
id int64
|
||||
isClosed bool
|
||||
protoIdentifier *api.ProtoIdentifier
|
||||
isTapTarget bool
|
||||
clients []*tcpReader
|
||||
servers []*tcpReader
|
||||
origin api.Capture
|
||||
reqResMatchers []api.RequestResponseMatcher
|
||||
createdAt time.Time
|
||||
streamsMap api.TcpStreamMap
|
||||
id int64
|
||||
isClosed bool
|
||||
protocol *api.Protocol
|
||||
isTapTarget bool
|
||||
client *tcpReader
|
||||
server *tcpReader
|
||||
origin api.Capture
|
||||
counterPairs []*api.CounterPair
|
||||
reqResMatchers []api.RequestResponseMatcher
|
||||
createdAt time.Time
|
||||
streamsMap api.TcpStreamMap
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func NewTcpStream(isTapTarget bool, streamsMap api.TcpStreamMap, capture api.Capture) *tcpStream {
|
||||
return &tcpStream{
|
||||
isTapTarget: isTapTarget,
|
||||
protoIdentifier: &api.ProtoIdentifier{},
|
||||
streamsMap: streamsMap,
|
||||
origin: capture,
|
||||
isTapTarget: isTapTarget,
|
||||
streamsMap: streamsMap,
|
||||
origin: capture,
|
||||
createdAt: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,38 +56,12 @@ func (t *tcpStream) close() {
|
||||
|
||||
t.streamsMap.Delete(t.id)
|
||||
|
||||
for i := range t.clients {
|
||||
reader := t.clients[i]
|
||||
reader.close()
|
||||
}
|
||||
for i := range t.servers {
|
||||
reader := t.servers[i]
|
||||
reader.close()
|
||||
}
|
||||
t.client.close()
|
||||
t.server.close()
|
||||
}
|
||||
|
||||
func (t *tcpStream) addClient(reader *tcpReader) {
|
||||
t.clients = append(t.clients, reader)
|
||||
}
|
||||
|
||||
func (t *tcpStream) addServer(reader *tcpReader) {
|
||||
t.servers = append(t.servers, reader)
|
||||
}
|
||||
|
||||
func (t *tcpStream) getClients() []*tcpReader {
|
||||
return t.clients
|
||||
}
|
||||
|
||||
func (t *tcpStream) getServers() []*tcpReader {
|
||||
return t.servers
|
||||
}
|
||||
|
||||
func (t *tcpStream) getClient(index int) *tcpReader {
|
||||
return t.clients[index]
|
||||
}
|
||||
|
||||
func (t *tcpStream) getServer(index int) *tcpReader {
|
||||
return t.servers[index]
|
||||
func (t *tcpStream) addCounterPair(counterPair *api.CounterPair) {
|
||||
t.counterPairs = append(t.counterPairs, counterPair)
|
||||
}
|
||||
|
||||
func (t *tcpStream) addReqResMatcher(reqResMatcher api.RequestResponseMatcher) {
|
||||
@@ -94,37 +69,21 @@ func (t *tcpStream) addReqResMatcher(reqResMatcher api.RequestResponseMatcher) {
|
||||
}
|
||||
|
||||
func (t *tcpStream) SetProtocol(protocol *api.Protocol) {
|
||||
t.protocol = protocol
|
||||
|
||||
// Clean the buffers
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
if t.protoIdentifier.IsClosedOthers {
|
||||
return
|
||||
}
|
||||
|
||||
t.protoIdentifier.Protocol = protocol
|
||||
|
||||
for i := range t.clients {
|
||||
reader := t.clients[i]
|
||||
if reader.GetExtension().Protocol != t.protoIdentifier.Protocol {
|
||||
reader.close()
|
||||
}
|
||||
}
|
||||
for i := range t.servers {
|
||||
reader := t.servers[i]
|
||||
if reader.GetExtension().Protocol != t.protoIdentifier.Protocol {
|
||||
reader.close()
|
||||
}
|
||||
}
|
||||
|
||||
t.protoIdentifier.IsClosedOthers = true
|
||||
t.client.msgBufferMaster = make([]api.TcpReaderDataMsg, 0)
|
||||
t.server.msgBufferMaster = make([]api.TcpReaderDataMsg, 0)
|
||||
t.Unlock()
|
||||
}
|
||||
|
||||
func (t *tcpStream) GetOrigin() api.Capture {
|
||||
return t.origin
|
||||
}
|
||||
|
||||
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
|
||||
return t.protoIdentifier
|
||||
func (t *tcpStream) GetProtocol() *api.Protocol {
|
||||
return t.protocol
|
||||
}
|
||||
|
||||
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||
|
||||
@@ -3,7 +3,6 @@ package tap
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
@@ -62,62 +61,50 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcpLayer *lay
|
||||
reassemblyStream := NewTcpReassemblyStream(fmt.Sprintf("%s:%s", net, transport), tcpLayer, fsmOptions, stream)
|
||||
if stream.GetIsTapTarget() {
|
||||
stream.setId(factory.streamsMap.NextId())
|
||||
for i, extension := range extensions {
|
||||
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
|
||||
stream.addReqResMatcher(reqResMatcher)
|
||||
for _, extension := range extensions {
|
||||
counterPair := &api.CounterPair{
|
||||
Request: 0,
|
||||
Response: 0,
|
||||
}
|
||||
stream.addClient(
|
||||
NewTcpReader(
|
||||
make(chan api.TcpReaderDataMsg),
|
||||
&api.ReadProgress{},
|
||||
fmt.Sprintf("%s %s", net, transport),
|
||||
&api.TcpID{
|
||||
SrcIP: srcIp,
|
||||
DstIP: dstIp,
|
||||
SrcPort: srcPort,
|
||||
DstPort: dstPort,
|
||||
},
|
||||
time.Time{},
|
||||
stream,
|
||||
true,
|
||||
props.isOutgoing,
|
||||
extension,
|
||||
factory.emitter,
|
||||
counterPair,
|
||||
reqResMatcher,
|
||||
),
|
||||
)
|
||||
stream.addServer(
|
||||
NewTcpReader(
|
||||
make(chan api.TcpReaderDataMsg),
|
||||
&api.ReadProgress{},
|
||||
fmt.Sprintf("%s %s", net, transport),
|
||||
&api.TcpID{
|
||||
SrcIP: net.Dst().String(),
|
||||
DstIP: net.Src().String(),
|
||||
SrcPort: transport.Dst().String(),
|
||||
DstPort: transport.Src().String(),
|
||||
},
|
||||
time.Time{},
|
||||
stream,
|
||||
false,
|
||||
props.isOutgoing,
|
||||
extension,
|
||||
factory.emitter,
|
||||
counterPair,
|
||||
reqResMatcher,
|
||||
),
|
||||
)
|
||||
stream.addCounterPair(counterPair)
|
||||
|
||||
factory.streamsMap.Store(stream.getId(), stream)
|
||||
|
||||
factory.wg.Add(2)
|
||||
go stream.getClient(i).run(filteringOptions, &factory.wg)
|
||||
go stream.getServer(i).run(filteringOptions, &factory.wg)
|
||||
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
|
||||
stream.addReqResMatcher(reqResMatcher)
|
||||
}
|
||||
|
||||
stream.client = NewTcpReader(
|
||||
fmt.Sprintf("%s %s", net, transport),
|
||||
&api.TcpID{
|
||||
SrcIP: srcIp,
|
||||
DstIP: dstIp,
|
||||
SrcPort: srcPort,
|
||||
DstPort: dstPort,
|
||||
},
|
||||
stream,
|
||||
true,
|
||||
props.isOutgoing,
|
||||
factory.emitter,
|
||||
)
|
||||
|
||||
stream.server = NewTcpReader(
|
||||
fmt.Sprintf("%s %s", net, transport),
|
||||
&api.TcpID{
|
||||
SrcIP: net.Dst().String(),
|
||||
DstIP: net.Src().String(),
|
||||
SrcPort: transport.Dst().String(),
|
||||
DstPort: transport.Src().String(),
|
||||
},
|
||||
stream,
|
||||
false,
|
||||
props.isOutgoing,
|
||||
factory.emitter,
|
||||
)
|
||||
|
||||
factory.streamsMap.Store(stream.getId(), stream)
|
||||
|
||||
factory.wg.Add(2)
|
||||
go stream.client.run(filteringOptions, &factory.wg)
|
||||
go stream.server.run(filteringOptions, &factory.wg)
|
||||
}
|
||||
return reassemblyStream
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ func (streamMap *tcpStreamMap) CloseTimedoutTcpStreamChannels() {
|
||||
return true
|
||||
}
|
||||
|
||||
if stream.protoIdentifier.Protocol == nil {
|
||||
if stream.protocol == nil {
|
||||
if !stream.isClosed && time.Now().After(stream.createdAt.Add(tcpStreamChannelTimeoutMs)) {
|
||||
stream.close()
|
||||
diagnose.AppStats.IncDroppedTcpStreams()
|
||||
|
||||
@@ -68,7 +68,7 @@ struct fd_info {
|
||||
BPF_HASH(pids_map, __u32, __u32);
|
||||
BPF_LRU_HASH(ssl_write_context, __u64, struct ssl_info);
|
||||
BPF_LRU_HASH(ssl_read_context, __u64, struct ssl_info);
|
||||
BPF_HASH(file_descriptor_to_ipv4, __u64, struct fd_info);
|
||||
BPF_LRU_HASH(file_descriptor_to_ipv4, __u64, struct fd_info);
|
||||
BPF_PERF_OUTPUT(chunks_buffer);
|
||||
BPF_PERF_OUTPUT(log_buffer);
|
||||
|
||||
|
||||
@@ -188,8 +188,7 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, address *addressPair, key
|
||||
}
|
||||
|
||||
stream := &tlsStream{
|
||||
reader: reader,
|
||||
protoIdentifier: &api.ProtoIdentifier{},
|
||||
reader: reader,
|
||||
}
|
||||
streamsMap.Store(streamsMap.NextId(), stream)
|
||||
|
||||
|
||||
@@ -94,7 +94,3 @@ func (r *tlsReader) GetEmitter() api.Emitter {
|
||||
func (r *tlsReader) GetIsClosed() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (r *tlsReader) GetExtension() *api.Extension {
|
||||
return r.extension
|
||||
}
|
||||
|
||||
@@ -3,20 +3,20 @@ package tlstapper
|
||||
import "github.com/up9inc/mizu/tap/api"
|
||||
|
||||
type tlsStream struct {
|
||||
reader *tlsReader
|
||||
protoIdentifier *api.ProtoIdentifier
|
||||
reader *tlsReader
|
||||
protocol *api.Protocol
|
||||
}
|
||||
|
||||
func (t *tlsStream) GetOrigin() api.Capture {
|
||||
return api.Ebpf
|
||||
}
|
||||
|
||||
func (t *tlsStream) GetProtoIdentifier() *api.ProtoIdentifier {
|
||||
return t.protoIdentifier
|
||||
func (t *tlsStream) GetProtocol() *api.Protocol {
|
||||
return t.protocol
|
||||
}
|
||||
|
||||
func (t *tlsStream) SetProtocol(protocol *api.Protocol) {
|
||||
t.protoIdentifier.Protocol = protocol
|
||||
t.protocol = protocol
|
||||
}
|
||||
|
||||
func (t *tlsStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -67,7 +67,6 @@
|
||||
height: 100%
|
||||
display: flex
|
||||
flex-direction: column
|
||||
margin-right: 10px
|
||||
width: 100%
|
||||
border-radius: 4px
|
||||
|
||||
@@ -82,17 +81,18 @@
|
||||
margin-top: 10px
|
||||
margin-right: 10px
|
||||
|
||||
.protocolsFilterList, .servicesFilter
|
||||
.card
|
||||
background: white
|
||||
padding: 10px
|
||||
border-radius: 4px
|
||||
user-select: none
|
||||
box-shadow: 0px 1px 5px #979797
|
||||
|
||||
.servicesFilter
|
||||
margin-top: 10px
|
||||
.servicesFilterWrapper
|
||||
margin-top: 20px
|
||||
margin-bottom: 3px
|
||||
height: 100%
|
||||
overflow: hidden
|
||||
border-radius: 4px
|
||||
|
||||
& .servicesFilterList
|
||||
height: calc(100% - 30px - 52px)
|
||||
.servicesFilterList
|
||||
height: calc(100% - 30px - 52px)
|
||||
|
||||
@@ -158,6 +158,7 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onClos
|
||||
if (checkedProtocols.length === 0) {
|
||||
setCheckedProtocols(getProtocolsForFilter.map(x => x.key))
|
||||
}
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [getProtocolsForFilter])
|
||||
|
||||
useEffect(() => {
|
||||
@@ -217,13 +218,13 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onClos
|
||||
<div className={styles.filterSection + ` ${isFilterClicked ? styles.show : ""}`}>
|
||||
<Resizeable minWidth={170} maxWidth={320}>
|
||||
<div className={styles.filterWrapper}>
|
||||
<div className={styles.protocolsFilterList}>
|
||||
<div className={styles.card}>
|
||||
<SelectList items={getProtocolsForFilter} checkBoxWidth="5%" tableName={"PROTOCOLS"} multiSelect={true}
|
||||
checkedValues={checkedProtocols} setCheckedValues={onProtocolsChange} tableClassName={styles.filters}
|
||||
inputSearchClass={styles.servicesFilterSearch} isFilterable={false}/>
|
||||
</div>
|
||||
<div className={styles.servicesFilter}>
|
||||
<div className={styles.servicesFilterList}>
|
||||
<div className={styles.servicesFilterWrapper + ` ${styles.card}`}>
|
||||
<div className={styles.servicesFilterList}>
|
||||
<SelectList items={getServicesForFilter} tableName={"SERVICES"} tableClassName={styles.filters} multiSelect={true}
|
||||
checkBoxWidth="5%" checkedValues={checkedServices} setCheckedValues={onServiceChanges} inputSearchClass={styles.servicesFilterSearch}/>
|
||||
</div>
|
||||
|
||||
@@ -85,7 +85,7 @@ const SelectList: React.FC<Props> = ({ items, tableName, checkedValues = [], mul
|
||||
|
||||
const tableBody = filteredValues.length === 0 ?
|
||||
<tr>
|
||||
<td colSpan={2}>
|
||||
<td colSpan={2} className={styles.displayBlock}>
|
||||
<NoDataMessage messageText={noItemsMessage} />
|
||||
</td>
|
||||
</tr>
|
||||
|
||||
@@ -22,8 +22,8 @@ export const StatusBar: React.FC<StatusBarProps> = ({isDemoBannerView, disabled}
|
||||
const {uniqueNamespaces, amountOfPods, amountOfTappedPods, amountOfUntappedPods} = useRecoilValue(tappingStatusDetails);
|
||||
return <div style={{opacity: disabled ? 0.4 : 1}} className={`${isDemoBannerView ? `${style.banner}` : ''} ${style.statusBar} ${(expandedBar && !disabled ? `${style.expandedStatusBar}` : "")}`} onMouseOver={() => setExpandedBar(true)} onMouseLeave={() => setExpandedBar(false)} data-cy="expandedStatusBar">
|
||||
<div className={style.podsCount}>
|
||||
{tappingStatus.some(pod => !pod.isTapped) && <img src={warningIcon} alt="warning"/>}
|
||||
{disabled && <Tooltip title={"Tapping status is not updated when streaming is paused"} isSimple><img src={warningIcon} alt="warning"/></Tooltip>}
|
||||
{!disabled && tappingStatus.some(pod => !pod.isTapped) && <img src={warningIcon} alt="warning"/>}
|
||||
{disabled && <Tooltip title={"Tapping status is not updated when streaming is paused"} isSimple><img src={warningIcon} alt="warning"/></Tooltip>}
|
||||
<span className={style.podsCountText} data-cy="podsCountText">
|
||||
{`Tapping ${amountOfUntappedPods > 0 ? amountOfTappedPods + " / " + amountOfPods : amountOfPods} ${pluralize('pod', amountOfPods)} in ${pluralize('namespace', uniqueNamespaces.length)} ${uniqueNamespaces.join(", ")}`}
|
||||
</span>
|
||||
|
||||
@@ -1,44 +1,70 @@
|
||||
@import '../../../variables.module'
|
||||
@import '../../../components'
|
||||
|
||||
|
||||
.selectListTable
|
||||
overflow: auto
|
||||
height: 100%
|
||||
user-select: none // when resizble moved we get unwanted beheviour
|
||||
height: 100%
|
||||
|
||||
table
|
||||
width: 100%
|
||||
margin-top: 20px
|
||||
border-collapse: collapse
|
||||
table-layout: fixed
|
||||
height: 100%
|
||||
display: flex
|
||||
flex-flow: column
|
||||
height: 100%
|
||||
|
||||
th
|
||||
color: $blue-gray
|
||||
text-align: left
|
||||
padding: 10px
|
||||
position: sticky
|
||||
top: 0
|
||||
background: $main-background-color
|
||||
font-size: 12px
|
||||
thead
|
||||
display: table
|
||||
table-layout: fixed
|
||||
flex: 0 0 auto
|
||||
width: calc(100% - 0.9em)
|
||||
|
||||
tr
|
||||
border-bottom-width: 1px
|
||||
border-bottom-color: $data-background-color
|
||||
border-bottom-style: solid
|
||||
width: 100%
|
||||
tbody
|
||||
display: block
|
||||
overflow: auto
|
||||
width: 100%
|
||||
height: 100%
|
||||
flex: 1 1 auto
|
||||
|
||||
td
|
||||
color: $light-gray
|
||||
padding: 10px
|
||||
font-size: 11px
|
||||
font-weight: 600
|
||||
padding-top: 5px
|
||||
padding-bottom: 5px
|
||||
tbody tr:hover
|
||||
background: $header-background-color
|
||||
|
||||
th
|
||||
color: $blue-gray
|
||||
text-align: left
|
||||
padding: 10px
|
||||
background: $main-background-color
|
||||
font-size: 12px
|
||||
|
||||
tr
|
||||
border-bottom-width: 1px
|
||||
border-bottom-color: $data-background-color
|
||||
border-bottom-style: solid
|
||||
width: 100%
|
||||
display: block
|
||||
position: relative
|
||||
display: table
|
||||
table-layout: fixed
|
||||
|
||||
td
|
||||
color: $light-gray
|
||||
padding: 10px
|
||||
font-size: 11px
|
||||
font-weight: 600
|
||||
padding-top: 5px
|
||||
padding-bottom: 5px
|
||||
|
||||
.nowrap
|
||||
white-space: nowrap
|
||||
|
||||
.totalSelected
|
||||
font-size: 12px
|
||||
color: $light-blue-color
|
||||
font-weight: 700
|
||||
font-size: 12px
|
||||
color: $light-blue-color
|
||||
font-weight: 700
|
||||
|
||||
.displayBlock
|
||||
display: block
|
||||
|
||||
.filterInput
|
||||
margin-bottom: 20px
|
||||
|
||||
Reference in New Issue
Block a user