Compare commits

...

11 Commits

Author SHA1 Message Date
David Levanon
a9de4f0bba stop tapping self tapper traffic (#1083)
* stop tapping self tapper traffic

* run go mod tidy

* allow to explicitly ignore ports

* remove unused code

* remove shared from tap + go mod tidy

* move ignroe ports to tapper

* rename TapperPacketsCount to IgnoredPacketsCount

* don't check null - go is smart

* remove nil check
2022-05-18 15:13:10 +03:00
lirazyehezkel
948af518b5 Fix double info icon in tapping status bar (#1095) 2022-05-18 14:40:20 +03:00
M. Mert Yıldıran
73448b514e Fix checkFilter method in the acceptance tests (#1092)
* Fix `checkFilter` method in the acceptance tests

* #run_acceptance_tests

* Remove duplicate assertion #run_acceptance_tests
2022-05-17 14:41:08 +03:00
M. Mert Yıldıran
fc194354bc Fix a nil pointer dereference error that occurs in tcpReader's Read method (#1090)
* Fix a `nil pointer dereference` error that occurs in `tcpReader`'s `Read` method

* #run_acceptance_tests

* #run_acceptance_tests

* Revert "Fix a `nil pointer dereference` error that occurs in `tcpReader`'s `Read` method"

This reverts commit ccef6cb393.

* Fix the race condition using locks #run_acceptance_tests
2022-05-17 13:13:50 +03:00
M. Mert Yıldıran
8418802c7e Handle the wait for fetch in acceptance tests better (#1088)
* Handle the wait for fetch in acceptance tests better

* #run_acceptance_tests

* Fix the error #run_acceptance_tests

* Fix `waitForFetchAndPause` and `checkFilter` #run_acceptance_tests

* Fix the tests #run_acceptance_tests
2022-05-17 11:01:47 +03:00
M. Mert Yıldıran
bfa834e840 Spawn only two Goroutines per TCP stream (#1062)
* Spawn only two Goroutines per TCP stream

* Fix the linter error

* Use `isProtocolIdentified` method instead

* Fix the `Read` method of `tcpReader`

* Remove unnecessary `append`

* Copy to buffer only a message is received

* Remove `exhaustBuffer` field and add `rewind` function

* Rename `buffer` field to `pastData`

* Update tap/tcp_reader.go

Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>

* Use `copy` instead of assignment

* No lint

* #run_acceptance_tests

* Fix `rewind` #run_acceptance_tests

* Fix the buffering algorithm #run_acceptance_tests

* Add `TODO`

* Fix the problems in AMQP and Kafka #run_acceptance_tests

* Use `*bytes.Buffer` instead of `[]api.TcpReaderDataMsg` #run_acceptance_tests

* Have a single `*bytes.Buffer`

* Revert "Have a single `*bytes.Buffer`"

This reverts commit fad96a288a.

* Revert "Use `*bytes.Buffer` instead of `[]api.TcpReaderDataMsg` #run_acceptance_tests"

This reverts commit 0fc70bffe2.

* Fix the early timing out issue #run_acceptance_tests

* Remove `NewBytes()` method

* Update the `NewTcpReader` method signature #run_acceptance_tests

* #run_acceptance_tests

* #run_acceptance_tests

* #run_acceptance_tests

Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>
2022-05-16 16:06:36 +03:00
David Levanon
5c012641a5 Feature/limit fd to address map (#1078)
* limit fd to address map

* limit fd to address map

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-05-16 12:32:26 +03:00
Igor Gov
74bd4b180f Removing elastic dump experimental feature (#1086)
* Removing elastic dump experimental feature
2022-05-15 15:23:03 +03:00
RoyUP9
8ea2dabb34 Added send to socket error validation (#1085) 2022-05-15 14:44:48 +03:00
leon-up9
366d34b8d0 Ui/fix/selectList-sticky-header-fix (#1084)
* selectList sticky header

* selectList changed & servicemap adapted

* ignore eslint

Co-authored-by: Leon <>
Co-authored-by: AmitUp9 <96980485+AmitUp9@users.noreply.github.com>
2022-05-15 14:39:09 +03:00
M. Mert Yıldıran
5fc3e38c1a Fix the Kafka ApiKey query and add ApiKeyName field (human-readable ApiKey) (#1080)
* Fix the Kafka `ApiKey` query and add `ApiKeyName` field (human-readable `ApiKey`)

* Update the dataset for Kafka unit tests

* #run_acceptance_tests
2022-05-15 09:42:32 +03:00
47 changed files with 421 additions and 519 deletions

View File

@@ -193,58 +193,56 @@ function checkFilter(filterDetails) {
const entriesForDeeperCheck = 5;
it(`checking the filter: ${filter}`, function () {
waitForFetch50AndPause();
cy.get('.w-tc-editor-text').clear();
// applying the filter with alt+enter or with the button
cy.get('.w-tc-editor-text').type(`${filter}${applyByCtrlEnter ? '{ctrl+enter}' : ''}`);
cy.get('.w-tc-editor').should('have.attr', 'style').and('include', Cypress.env('greenFilterColor'));
if (!applyByCtrlEnter)
cy.get('[type="submit"]').click();
cy.get('#total-entries').should('not.have.text', '0').then(number => {
const totalEntries = number.text();
waitForFetch();
pauseStream();
cy.get(`#list [id^=entry]`).last().then(elem => {
const element = elem[0];
const entryId = getEntryId(element.id);
// checks the hover on the last entry (the only one in DOM at the beginning)
leftOnHoverCheck(entryId, leftSidePath, filter);
cy.get(`#list [id^=entry]`).last().then(elem => {
const element = elem[0];
const entryId = getEntryId(element.id);
cy.get('.w-tc-editor-text').clear();
// applying the filter with alt+enter or with the button
cy.get('.w-tc-editor-text').type(`${filter}${applyByCtrlEnter ? '{ctrl+enter}' : ''}`);
cy.get('.w-tc-editor').should('have.attr', 'style').and('include', Cypress.env('greenFilterColor'));
if (!applyByCtrlEnter)
cy.get('[type="submit"]').click();
// only one entry in DOM after filtering, checking all checks on it
leftTextCheck(entryId, leftSidePath, leftSideExpectedText);
leftOnHoverCheck(entryId, leftSidePath, filter);
waitForFetch50AndPause();
rightTextCheck(rightSidePath, rightSideExpectedText);
rightOnHoverCheck(rightSidePath, filter);
checkRightSideResponseBody();
});
// only one entry in DOM after filtering, checking all checks on it
leftTextCheck(entryId, leftSidePath, leftSideExpectedText);
leftOnHoverCheck(entryId, leftSidePath, filter);
resizeToHugeMizu();
rightTextCheck(rightSidePath, rightSideExpectedText);
rightOnHoverCheck(rightSidePath, filter);
checkRightSideResponseBody();
});
// checking only 'leftTextCheck' on all entries because the rest of the checks require more time
cy.get(`#list [id^=entry]`).each(elem => {
const element = elem[0];
let entryId = getEntryId(element.id);
leftTextCheck(entryId, leftSidePath, leftSideExpectedText);
});
resizeToHugeMizu();
// making the other 3 checks on the first X entries (longer time for each check)
deeperCheck(leftSidePath, rightSidePath, filter, rightSideExpectedText, entriesForDeeperCheck);
// checking only 'leftTextCheck' on all entries because the rest of the checks require more time
cy.get(`#list [id^=entry]`).each(elem => {
const element = elem[0];
let entryId = getEntryId(element.id);
leftTextCheck(entryId, leftSidePath, leftSideExpectedText);
});
// making the other 3 checks on the first X entries (longer time for each check)
deeperCheck(leftSidePath, rightSidePath, filter, rightSideExpectedText, entriesForDeeperCheck);
// reloading then waiting for the entries number to load
resizeToNormalMizu();
cy.reload();
cy.get('#total-entries', {timeout: refreshWaitTimeout}).should('have.text', totalEntries);
})
// reloading then waiting for the entries number to load
resizeToNormalMizu();
cy.reload();
waitForFetch();
pauseStream();
});
}
function waitForFetch50AndPause() {
// wait half a second and pause the stream to preserve the DOM
cy.wait(500);
function waitForFetch() {
cy.get('#entries-length', {timeout: refreshWaitTimeout}).should((el) => {
expect(parseInt(el.text().trim(), 10)).to.be.greaterThan(20);
});
}
function pauseStream() {
cy.get('#pause-icon').click();
cy.get('#pause-icon').should('not.be.visible');
}

View File

@@ -6,7 +6,6 @@ require (
github.com/antelman107/net-wait-go v0.0.0-20210623112055-cf684aebda7b
github.com/chanced/openapi v0.0.8
github.com/djherbis/atime v1.1.0
github.com/elastic/go-elasticsearch/v7 v7.17.0
github.com/getkin/kin-openapi v0.89.0
github.com/gin-contrib/static v0.0.1
github.com/gin-gonic/gin v1.7.7

View File

@@ -166,8 +166,6 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/elastic/go-elasticsearch/v7 v7.17.0 h1:0fcSh4qeC/i1+7QU1KXpmq2iUAdMk4l0/vmbtW1+KJM=
github.com/elastic/go-elasticsearch/v7 v7.17.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=

View File

@@ -17,7 +17,6 @@ import (
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/elastic"
"github.com/up9inc/mizu/agent/pkg/entries"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/models"
@@ -153,7 +152,9 @@ func runInTapperMode() {
}
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tapOpts := &tap.TapOpts{
HostMode: hostMode,
}
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
@@ -205,7 +206,6 @@ func enableExpFeatureIfNeeded() {
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap)
serviceMapGenerator.Enable()
}
elastic.GetInstance().Configure(config.Config.Elastic)
}
func getSyncEntriesConfig() *shared.SyncEntriesConfig {

View File

@@ -5,20 +5,19 @@ import (
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/logger"
tapApi "github.com/up9inc/mizu/tap/api"
)
type EntryStreamerSocketConnector interface {
SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams)
SendMetadata(socketId int, metadata *basenine.Metadata)
SendToastError(socketId int, err error)
SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) error
SendMetadata(socketId int, metadata *basenine.Metadata) error
SendToastError(socketId int, err error) error
CleanupSocket(socketId int)
}
type DefaultEntryStreamerSocketConnector struct{}
func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) {
func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) error {
var message []byte
if params.EnableFullEntries {
message, _ = models.CreateFullEntryWebSocketMessage(entry)
@@ -29,26 +28,32 @@ func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tap
}
if err := SendToSocket(socketId, message); err != nil {
logger.Log.Error(err)
return err
}
return nil
}
func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) {
func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) error {
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
if err := SendToSocket(socketId, metadataBytes); err != nil {
logger.Log.Error(err)
return err
}
return nil
}
func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) {
func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) error {
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
Type: "error",
AutoClose: 5000,
Text: fmt.Sprintf("Syntax error: %s", err.Error()),
})
if err := SendToSocket(socketId, toastBytes); err != nil {
logger.Log.Error(err)
return err
}
return nil
}
func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) {

View File

@@ -14,7 +14,6 @@ import (
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/elastic"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/agent/pkg/holder"
"github.com/up9inc/mizu/agent/pkg/providers"
@@ -153,8 +152,6 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
elastic.GetInstance().PushEntry(mizuEntry)
}
}

View File

@@ -34,14 +34,18 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
meta := make(chan []byte)
query := params.Query
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
if err != nil {
entryStreamerSocketConnector.SendToastError(socketId, err)
if err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query); err != nil {
if err := entryStreamerSocketConnector.SendToastError(socketId, err); err != nil {
return err
}
entryStreamerSocketConnector.CleanupSocket(socketId)
return err
}
leftOff, err := e.fetch(socketId, params, entryStreamerSocketConnector)
if err != nil {
logger.Log.Errorf("Fetch error: %v", err.Error())
logger.Log.Errorf("Fetch error: %v", err)
}
handleDataChannel := func(c *basenine.Connection, data chan []byte) {
@@ -53,13 +57,15 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
}
var entry *tapApi.Entry
err = json.Unmarshal(bytes, &entry)
if err != nil {
logger.Log.Debugf("Error unmarshalling entry: %v", err.Error())
if err = json.Unmarshal(bytes, &entry); err != nil {
logger.Log.Debugf("Error unmarshalling entry: %v", err)
continue
}
entryStreamerSocketConnector.SendEntry(socketId, entry, params)
if err := entryStreamerSocketConnector.SendEntry(socketId, entry, params); err != nil {
logger.Log.Errorf("Error sending entry to socket, err: %v", err)
return
}
}
}
@@ -72,13 +78,15 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
}
var metadata *basenine.Metadata
err = json.Unmarshal(bytes, &metadata)
if err != nil {
logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error())
if err = json.Unmarshal(bytes, &metadata); err != nil {
logger.Log.Debugf("Error unmarshalling metadata: %v", err)
continue
}
entryStreamerSocketConnector.SendMetadata(socketId, metadata)
if err := entryStreamerSocketConnector.SendMetadata(socketId, metadata); err != nil {
logger.Log.Errorf("Error sending metadata to socket, err: %v", err)
return
}
}
}
@@ -125,28 +133,31 @@ func (e *BasenineEntryStreamer) fetch(socketId int, params *WebSocketParams, con
}
var firstMetadata *basenine.Metadata
err = json.Unmarshal(firstMeta, &firstMetadata)
if err != nil {
if err = json.Unmarshal(firstMeta, &firstMetadata); err != nil {
return
}
leftOff = firstMetadata.LeftOff
var lastMetadata *basenine.Metadata
err = json.Unmarshal(lastMeta, &lastMetadata)
if err != nil {
if err = json.Unmarshal(lastMeta, &lastMetadata); err != nil {
return
}
if err = connector.SendMetadata(socketId, lastMetadata); err != nil {
return
}
connector.SendMetadata(socketId, lastMetadata)
data = e.reverseBytesSlice(data)
for _, row := range data {
var entry *tapApi.Entry
err = json.Unmarshal(row, &entry)
if err != nil {
if err = json.Unmarshal(row, &entry); err != nil {
break
}
connector.SendEntry(socketId, entry, params)
if err = connector.SendEntry(socketId, entry, params); err != nil {
return
}
}
return
}

View File

@@ -1,116 +0,0 @@
package elastic
import (
"bytes"
"crypto/tls"
"encoding/json"
"net/http"
"sync"
"time"
"github.com/elastic/go-elasticsearch/v7"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap/api"
)
type client struct {
es *elasticsearch.Client
index string
insertedCount int
}
var instance *client
var once sync.Once
func GetInstance() *client {
once.Do(func() {
instance = newClient()
})
return instance
}
func (client *client) Configure(config shared.ElasticConfig) {
if config.Url == "" || config.User == "" || config.Password == "" {
if client.es != nil {
client.es = nil
}
logger.Log.Infof("No elastic configuration was supplied, elastic exporter disabled")
return
}
transport := http.DefaultTransport
tlsClientConfig := &tls.Config{InsecureSkipVerify: true}
transport.(*http.Transport).TLSClientConfig = tlsClientConfig
cfg := elasticsearch.Config{
Addresses: []string{config.Url},
Username: config.User,
Password: config.Password,
Transport: transport,
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
logger.Log.Errorf("Failed to initialize elastic client %v", err)
}
// Have the client instance return a response
res, err := es.Info()
if err != nil {
logger.Log.Errorf("Elastic client.Info() ERROR: %v", err)
} else {
client.es = es
client.index = "mizu_traffic_http_" + time.Now().Format("2006_01_02_15_04")
client.insertedCount = 0
logger.Log.Infof("Elastic client configured, index: %s, cluster info: %v", client.index, res)
}
defer res.Body.Close()
}
func newClient() *client {
return &client{
es: nil,
index: "",
}
}
type httpEntry struct {
Source *api.TCP `json:"src"`
Destination *api.TCP `json:"dst"`
Outgoing bool `json:"outgoing"`
CreatedAt time.Time `json:"createdAt"`
Request map[string]interface{} `json:"request"`
Response map[string]interface{} `json:"response"`
ElapsedTime int64 `json:"elapsedTime"`
}
func (client *client) PushEntry(entry *api.Entry) {
if client.es == nil {
return
}
if entry.Protocol.Name != "http" {
return
}
entryToPush := httpEntry{
Source: entry.Source,
Destination: entry.Destination,
Outgoing: entry.Outgoing,
CreatedAt: entry.StartTime,
Request: entry.Request,
Response: entry.Response,
ElapsedTime: entry.ElapsedTime,
}
entryJson, err := json.Marshal(entryToPush)
if err != nil {
logger.Log.Errorf("json.Marshal ERROR: %v", err)
return
}
var buffer bytes.Buffer
buffer.WriteString(string(entryJson))
res, _ := client.es.Index(client.index, &buffer)
if res.StatusCode == 201 {
client.insertedCount += 1
}
}

View File

@@ -164,7 +164,6 @@ func getTapMizuAgentConfig() *shared.MizuAgentConfig {
ServiceMap: config.Config.ServiceMap,
OAS: config.Config.OAS,
Telemetry: config.Config.Telemetry,
Elastic: config.Config.Elastic,
}
return &mizuAgentConfig

View File

@@ -41,7 +41,6 @@ type ConfigStruct struct {
LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""`
ServiceMap bool `yaml:"service-map" default:"true"`
OAS bool `yaml:"oas" default:"true"`
Elastic shared.ElasticConfig `yaml:"elastic"`
}
func (config *ConfigStruct) validate() error {

View File

@@ -45,13 +45,6 @@ type MizuAgentConfig struct {
ServiceMap bool `json:"serviceMap"`
OAS bool `json:"oas"`
Telemetry bool `json:"telemetry"`
Elastic ElasticConfig `json:"elastic"`
}
type ElasticConfig struct {
User string `yaml:"user,omitempty" default:"" readonly:""`
Password string `yaml:"password,omitempty" default:"" readonly:""`
Url string `yaml:"url,omitempty" default:"" readonly:""`
}
type WebSocketMessageMetadata struct {

View File

@@ -104,11 +104,6 @@ type OutputChannelItem struct {
Namespace string
}
type ProtoIdentifier struct {
Protocol *Protocol
IsClosedOthers bool
}
type ReadProgress struct {
readBytes int
lastCurrent int
@@ -123,6 +118,11 @@ func (p *ReadProgress) Current() (n int) {
return p.lastCurrent
}
func (p *ReadProgress) Reset() {
p.readBytes = 0
p.lastCurrent = 0
}
type Dissector interface {
Register(*Extension)
Ping()
@@ -419,13 +419,12 @@ type TcpReader interface {
GetCaptureTime() time.Time
GetEmitter() Emitter
GetIsClosed() bool
GetExtension() *Extension
}
type TcpStream interface {
SetProtocol(protocol *Protocol)
GetOrigin() Capture
GetProtoIdentifier() *ProtoIdentifier
GetProtocol() *Protocol
GetReqResMatchers() []RequestResponseMatcher
GetIsTapTarget() bool
GetIsClosed() bool

View File

@@ -10,6 +10,7 @@ type AppStats struct {
ProcessedBytes uint64 `json:"processedBytes"`
PacketsCount uint64 `json:"packetsCount"`
TcpPacketsCount uint64 `json:"tcpPacketsCount"`
IgnoredPacketsCount uint64 `json:"ignoredPacketsCount"`
ReassembledTcpPayloadsCount uint64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount uint64 `json:"tlsConnectionsCount"`
MatchedPairs uint64 `json:"matchedPairs"`
@@ -33,6 +34,10 @@ func (as *AppStats) IncTcpPacketsCount() {
atomic.AddUint64(&as.TcpPacketsCount, 1)
}
func (as *AppStats) IncIgnoredPacketsCount() {
atomic.AddUint64(&as.IgnoredPacketsCount, 1)
}
func (as *AppStats) IncReassembledTcpPayloadsCount() {
atomic.AddUint64(&as.ReassembledTcpPayloadsCount, 1)
}
@@ -55,6 +60,7 @@ func (as *AppStats) DumpStats() *AppStats {
currentAppStats.ProcessedBytes = resetUint64(&as.ProcessedBytes)
currentAppStats.PacketsCount = resetUint64(&as.PacketsCount)
currentAppStats.TcpPacketsCount = resetUint64(&as.TcpPacketsCount)
currentAppStats.IgnoredPacketsCount = resetUint64(&as.IgnoredPacketsCount)
currentAppStats.ReassembledTcpPayloadsCount = resetUint64(&as.ReassembledTcpPayloadsCount)
currentAppStats.TlsConnectionsCount = resetUint64(&as.TlsConnectionsCount)
currentAppStats.MatchedPairs = resetUint64(&as.MatchedPairs)

View File

@@ -75,14 +75,14 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
var lastMethodFrameMessage Message
for {
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &protocol {
if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &protocol {
return errors.New("Identified by another protocol")
}
frame, err := r.ReadFrame()
if err == io.EOF {
// We must read until we see an EOF... very important!
return nil
return err
}
switch f := frame.(type) {

View File

@@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -7,18 +7,17 @@ import (
)
type tcpStream struct {
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
isClosed bool
protocol *api.Protocol
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
sync.Mutex
}
func NewTcpStream(capture api.Capture) api.TcpStream {
return &tcpStream{
origin: capture,
protoIdentifier: &api.ProtoIdentifier{},
origin: capture,
}
}
@@ -28,8 +27,8 @@ func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
func (t *tcpStream) GetProtocol() *api.Protocol {
return t.protocol
}
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {

View File

@@ -144,7 +144,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
http2Assembler = createHTTP2Assembler(b)
}
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &http11protocol {
if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &http11protocol {
return errors.New("Identified by another protocol")
}
@@ -200,7 +200,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
}
}
if reader.GetParent().GetProtoIdentifier().Protocol == nil {
if reader.GetParent().GetProtocol() == nil {
return err
}

View File

@@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -7,18 +7,17 @@ import (
)
type tcpStream struct {
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
isClosed bool
protocol *api.Protocol
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
sync.Mutex
}
func NewTcpStream(capture api.Capture) api.TcpStream {
return &tcpStream{
origin: capture,
protoIdentifier: &api.ProtoIdentifier{},
origin: capture,
}
}
@@ -28,8 +27,8 @@ func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
func (t *tcpStream) GetProtocol() *api.Protocol {
return t.protocol
}
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect8/kafka/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect9/kafka/\* expect

View File

@@ -3,13 +3,14 @@ package kafka
import (
"encoding/json"
"fmt"
"golang.org/x/text/cases"
"golang.org/x/text/language"
"reflect"
"sort"
"strconv"
"strings"
"golang.org/x/text/cases"
"golang.org/x/text/language"
"github.com/fatih/camelcase"
"github.com/ohler55/ojg/jp"
"github.com/ohler55/ojg/oj"
@@ -36,9 +37,14 @@ type KafkaWrapper struct {
func representRequestHeader(data map[string]interface{}, rep []interface{}) []interface{} {
requestHeader, _ := json.Marshal([]api.TableData{
{
Name: "ApiKeyName",
Value: data["apiKeyName"].(string),
Selector: `request.apiKeyName`,
},
{
Name: "ApiKey",
Value: apiNames[int(data["apiKey"].(float64))],
Value: int(data["apiKey"].(float64)),
Selector: `request.apiKey`,
},
{

View File

@@ -38,7 +38,7 @@ func (d dissecting) Ping() {
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
for {
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &_protocol {
if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &_protocol {
return errors.New("Identified by another protocol")
}
@@ -96,8 +96,8 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
statusQuery := ""
apiKey := ApiKey(entry.Request["apiKey"].(float64))
method := apiNames[apiKey]
methodQuery := fmt.Sprintf("request.apiKey == %d", int(entry.Request["apiKey"].(float64)))
method := entry.Request["apiKeyName"].(string)
methodQuery := fmt.Sprintf(`request.apiKeyName == "%s"`, method)
summary := ""
summaryQuery := ""

View File

@@ -11,6 +11,7 @@ import (
type Request struct {
Size int32 `json:"size"`
ApiKeyName string `json:"apiKeyName"`
ApiKey ApiKey `json:"apiKey"`
ApiVersion int16 `json:"apiVersion"`
CorrelationID int32 `json:"correlationID"`
@@ -202,6 +203,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, ca
request := &Request{
Size: size,
ApiKeyName: apiNames[apiKey],
ApiKey: apiKey,
ApiVersion: apiVersion,
CorrelationID: correlationID,

View File

@@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -7,18 +7,17 @@ import (
)
type tcpStream struct {
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
isClosed bool
protocol *api.Protocol
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
sync.Mutex
}
func NewTcpStream(capture api.Capture) api.TcpStream {
return &tcpStream{
origin: capture,
protoIdentifier: &api.ProtoIdentifier{},
origin: capture,
}
}
@@ -28,8 +27,8 @@ func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
func (t *tcpStream) GetProtocol() *api.Protocol {
return t.protocol
}
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {

View File

@@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -7,18 +7,17 @@ import (
)
type tcpStream struct {
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
isClosed bool
protocol *api.Protocol
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
sync.Mutex
}
func NewTcpStream(capture api.Capture) api.TcpStream {
return &tcpStream{
origin: capture,
protoIdentifier: &api.ProtoIdentifier{},
origin: capture,
}
}
@@ -28,8 +27,8 @@ func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
func (t *tcpStream) GetProtocol() *api.Protocol {
return t.protocol
}
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {

View File

@@ -6,6 +6,7 @@ require (
github.com/cilium/ebpf v0.8.0
github.com/go-errors/errors v1.4.2
github.com/google/gopacket v1.1.19
github.com/hashicorp/golang-lru v0.5.4
github.com/up9inc/mizu/logger v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74
@@ -18,7 +19,6 @@ require (
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/martian v2.1.0+incompatible // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
@@ -33,7 +33,6 @@ require (
k8s.io/utils v0.0.0-20220127004650-9b3446523e65 // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger

View File

@@ -276,6 +276,5 @@ sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLzkkmAkf+A6Y=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=

View File

@@ -16,6 +16,7 @@ import (
"runtime"
"strings"
"time"
"strconv"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
@@ -41,6 +42,7 @@ var debug = flag.Bool("debug", false, "Display debug information")
var quiet = flag.Bool("quiet", false, "Be quiet regarding errors")
var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex")
var procfs = flag.String("procfs", "/proc", "The procfs directory, used when mapping host volumes into a container")
var ignoredPorts = flag.String("ignore-ports", "", "A comma separated list of ports to ignore")
// capture
var iface = flag.String("i", "en0", "Interface to read packets from")
@@ -55,7 +57,8 @@ var tls = flag.Bool("tls", false, "Enable TLS tapper")
var memprofile = flag.String("memprofile", "", "Write memory profile")
type TapOpts struct {
HostMode bool
HostMode bool
IgnoredPorts []uint16
}
var extensions []*api.Extension // global
@@ -193,6 +196,8 @@ func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelI
logger.Log.Fatal(err)
}
opts.IgnoredPorts = append(opts.IgnoredPorts, buildIgnoredPortsList(*ignoredPorts)...)
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
return assembler
@@ -267,3 +272,19 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
return &tls
}
func buildIgnoredPortsList(ignoredPorts string) []uint16 {
tmp := strings.Split(ignoredPorts, ",")
result := make([]uint16, len(tmp))
for i, raw := range tmp {
v, err := strconv.Atoi(raw)
if err != nil {
continue
}
result[i] = uint16(v)
}
return result
}

View File

@@ -23,6 +23,7 @@ type tcpAssembler struct {
streamPool *reassembly.StreamPool
streamFactory *tcpStreamFactory
assemblerMutex sync.Mutex
ignoredPorts []uint16
}
// Context
@@ -48,8 +49,8 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d",
maxBufferedPagesTotal, maxBufferedPagesPerConnection)
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d, opts=%v",
maxBufferedPagesTotal, maxBufferedPagesPerConnection, opts)
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
@@ -57,6 +58,7 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp
Assembler: assembler,
streamPool: streamPool,
streamFactory: streamFactory,
ignoredPorts: opts.IgnoredPorts,
}
}
@@ -83,14 +85,18 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
diagnose.AppStats.IncTcpPacketsCount()
tcp := tcp.(*layers.TCP)
c := context{
CaptureInfo: packet.Metadata().CaptureInfo,
Origin: packetInfo.Source.Origin,
if a.shouldIgnorePort(uint16(tcp.DstPort)) {
diagnose.AppStats.IncIgnoredPacketsCount()
} else {
c := context{
CaptureInfo: packet.Metadata().CaptureInfo,
Origin: packetInfo.Source.Origin,
}
diagnose.InternalStats.Totalsz += len(tcp.Payload)
a.assemblerMutex.Lock()
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
a.assemblerMutex.Unlock()
}
diagnose.InternalStats.Totalsz += len(tcp.Payload)
a.assemblerMutex.Lock()
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
a.assemblerMutex.Unlock()
}
done := *maxcount > 0 && int64(diagnose.AppStats.PacketsCount) >= *maxcount
@@ -132,3 +138,13 @@ func (a *tcpAssembler) waitAndDump() {
logger.Log.Debugf("%s", a.Dump())
a.assemblerMutex.Unlock()
}
func (a *tcpAssembler) shouldIgnorePort(port uint16) bool {
for _, p := range a.ignoredPorts {
if port == p {
return true
}
}
return false
}

View File

@@ -3,11 +3,9 @@ package tap
import (
"bufio"
"io"
"io/ioutil"
"sync"
"time"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
)
@@ -17,50 +15,48 @@ import (
* Implements io.Reader interface (Read)
*/
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte
progress *api.ReadProgress
captureTime time.Time
parent *tcpStream
packetsSeen uint
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload
msgBuffer []api.TcpReaderDataMsg
msgBufferMaster []api.TcpReaderDataMsg
data []byte
progress *api.ReadProgress
captureTime time.Time
parent *tcpStream
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent *tcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) *tcpReader {
func NewTcpReader(ident string, tcpId *api.TcpID, parent *tcpStream, isClient bool, isOutgoing bool, emitter api.Emitter) *tcpReader {
return &tcpReader{
msgQueue: msgQueue,
progress: progress,
ident: ident,
tcpID: tcpId,
captureTime: captureTime,
parent: parent,
isClient: isClient,
isOutgoing: isOutgoing,
extension: extension,
emitter: emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
msgQueue: make(chan api.TcpReaderDataMsg),
progress: &api.ReadProgress{},
ident: ident,
tcpID: tcpId,
parent: parent,
isClient: isClient,
isOutgoing: isOutgoing,
emitter: emitter,
}
}
func (reader *tcpReader) run(options *api.TrafficFilteringOptions, wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(reader)
err := reader.extension.Dissector.Dissect(b, reader, options)
if err != nil {
_, err = io.Copy(ioutil.Discard, reader)
if err != nil {
logger.Log.Errorf("%v", err)
for i, extension := range extensions {
reader.reqResMatcher = reader.parent.reqResMatchers[i]
reader.counterPair = reader.parent.counterPairs[i]
b := bufio.NewReader(reader)
extension.Dissector.Dissect(b, reader, options) //nolint
if reader.isProtocolIdentified() {
break
}
reader.rewind()
}
}
@@ -81,21 +77,60 @@ func (reader *tcpReader) sendMsgIfNotClosed(msg api.TcpReaderDataMsg) {
reader.Unlock()
}
func (reader *tcpReader) isProtocolIdentified() bool {
return reader.parent.protocol != nil
}
func (reader *tcpReader) rewind() {
// Reset the data
reader.data = make([]byte, 0)
// Reset msgBuffer from the master record
reader.parent.Lock()
reader.msgBuffer = make([]api.TcpReaderDataMsg, len(reader.msgBufferMaster))
copy(reader.msgBuffer, reader.msgBufferMaster)
reader.parent.Unlock()
// Reset the read progress
reader.progress.Reset()
}
func (reader *tcpReader) populateData(msg api.TcpReaderDataMsg) {
reader.data = msg.GetBytes()
reader.captureTime = msg.GetTimestamp()
}
func (reader *tcpReader) Read(p []byte) (int, error) {
var msg api.TcpReaderDataMsg
for len(reader.msgBuffer) > 0 && len(reader.data) == 0 {
// Pop first message
if len(reader.msgBuffer) > 1 {
msg, reader.msgBuffer = reader.msgBuffer[0], reader.msgBuffer[1:]
} else {
msg = reader.msgBuffer[0]
reader.msgBuffer = make([]api.TcpReaderDataMsg, 0)
}
// Get the bytes
reader.populateData(msg)
}
ok := true
for ok && len(reader.data) == 0 {
msg, ok = <-reader.msgQueue
if msg != nil {
reader.data = msg.GetBytes()
reader.captureTime = msg.GetTimestamp()
}
reader.populateData(msg)
if len(reader.data) > 0 {
reader.packetsSeen += 1
if !reader.isProtocolIdentified() {
reader.msgBufferMaster = append(
reader.msgBufferMaster,
msg,
)
}
}
}
if !ok || len(reader.data) == 0 {
return 0, io.EOF
}
@@ -142,7 +177,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -6,7 +6,6 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
)
@@ -16,10 +15,10 @@ type tcpReassemblyStream struct {
fsmerr bool
optchecker reassembly.TCPOptionCheck
isDNS bool
tcpStream api.TcpStream
tcpStream *tcpStream
}
func NewTcpReassemblyStream(ident string, tcp *layers.TCP, fsmOptions reassembly.TCPSimpleFSMOptions, stream api.TcpStream) reassembly.Stream {
func NewTcpReassemblyStream(ident string, tcp *layers.TCP, fsmOptions reassembly.TCPSimpleFSMOptions, stream *tcpStream) reassembly.Stream {
return &tcpReassemblyStream{
ident: ident,
tcpState: reassembly.NewTCPSimpleFSM(fsmOptions),
@@ -139,17 +138,10 @@ func (t *tcpReassemblyStream) ReassembledSG(sg reassembly.ScatterGather, ac reas
// This channel is read by an tcpReader object
diagnose.AppStats.IncReassembledTcpPayloadsCount()
timestamp := ac.GetCaptureInfo().Timestamp
stream := t.tcpStream.(*tcpStream)
if dir == reassembly.TCPDirClientToServer {
for i := range stream.getClients() {
reader := stream.getClient(i)
reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
}
t.tcpStream.client.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
} else {
for i := range stream.getServers() {
reader := stream.getServer(i)
reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
}
t.tcpStream.server.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
}
}
}
@@ -157,7 +149,7 @@ func (t *tcpReassemblyStream) ReassembledSG(sg reassembly.ScatterGather, ac reas
func (t *tcpReassemblyStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
if t.tcpStream.GetIsTapTarget() && !t.tcpStream.GetIsClosed() {
t.tcpStream.(*tcpStream).close()
t.tcpStream.close()
}
// do not remove the connection to allow last ACK
return false

View File

@@ -13,25 +13,26 @@ import (
* In our implementation, we pass information from ReassembledSG to the TcpReader through a shared channel.
*/
type tcpStream struct {
id int64
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
clients []*tcpReader
servers []*tcpReader
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
createdAt time.Time
streamsMap api.TcpStreamMap
id int64
isClosed bool
protocol *api.Protocol
isTapTarget bool
client *tcpReader
server *tcpReader
origin api.Capture
counterPairs []*api.CounterPair
reqResMatchers []api.RequestResponseMatcher
createdAt time.Time
streamsMap api.TcpStreamMap
sync.Mutex
}
func NewTcpStream(isTapTarget bool, streamsMap api.TcpStreamMap, capture api.Capture) *tcpStream {
return &tcpStream{
isTapTarget: isTapTarget,
protoIdentifier: &api.ProtoIdentifier{},
streamsMap: streamsMap,
origin: capture,
isTapTarget: isTapTarget,
streamsMap: streamsMap,
origin: capture,
createdAt: time.Now(),
}
}
@@ -55,38 +56,12 @@ func (t *tcpStream) close() {
t.streamsMap.Delete(t.id)
for i := range t.clients {
reader := t.clients[i]
reader.close()
}
for i := range t.servers {
reader := t.servers[i]
reader.close()
}
t.client.close()
t.server.close()
}
func (t *tcpStream) addClient(reader *tcpReader) {
t.clients = append(t.clients, reader)
}
func (t *tcpStream) addServer(reader *tcpReader) {
t.servers = append(t.servers, reader)
}
func (t *tcpStream) getClients() []*tcpReader {
return t.clients
}
func (t *tcpStream) getServers() []*tcpReader {
return t.servers
}
func (t *tcpStream) getClient(index int) *tcpReader {
return t.clients[index]
}
func (t *tcpStream) getServer(index int) *tcpReader {
return t.servers[index]
func (t *tcpStream) addCounterPair(counterPair *api.CounterPair) {
t.counterPairs = append(t.counterPairs, counterPair)
}
func (t *tcpStream) addReqResMatcher(reqResMatcher api.RequestResponseMatcher) {
@@ -94,37 +69,21 @@ func (t *tcpStream) addReqResMatcher(reqResMatcher api.RequestResponseMatcher) {
}
func (t *tcpStream) SetProtocol(protocol *api.Protocol) {
t.protocol = protocol
// Clean the buffers
t.Lock()
defer t.Unlock()
if t.protoIdentifier.IsClosedOthers {
return
}
t.protoIdentifier.Protocol = protocol
for i := range t.clients {
reader := t.clients[i]
if reader.GetExtension().Protocol != t.protoIdentifier.Protocol {
reader.close()
}
}
for i := range t.servers {
reader := t.servers[i]
if reader.GetExtension().Protocol != t.protoIdentifier.Protocol {
reader.close()
}
}
t.protoIdentifier.IsClosedOthers = true
t.client.msgBufferMaster = make([]api.TcpReaderDataMsg, 0)
t.server.msgBufferMaster = make([]api.TcpReaderDataMsg, 0)
t.Unlock()
}
func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
func (t *tcpStream) GetProtocol() *api.Protocol {
return t.protocol
}
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {

View File

@@ -3,7 +3,6 @@ package tap
import (
"fmt"
"sync"
"time"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
@@ -62,62 +61,50 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcpLayer *lay
reassemblyStream := NewTcpReassemblyStream(fmt.Sprintf("%s:%s", net, transport), tcpLayer, fsmOptions, stream)
if stream.GetIsTapTarget() {
stream.setId(factory.streamsMap.NextId())
for i, extension := range extensions {
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
stream.addReqResMatcher(reqResMatcher)
for _, extension := range extensions {
counterPair := &api.CounterPair{
Request: 0,
Response: 0,
}
stream.addClient(
NewTcpReader(
make(chan api.TcpReaderDataMsg),
&api.ReadProgress{},
fmt.Sprintf("%s %s", net, transport),
&api.TcpID{
SrcIP: srcIp,
DstIP: dstIp,
SrcPort: srcPort,
DstPort: dstPort,
},
time.Time{},
stream,
true,
props.isOutgoing,
extension,
factory.emitter,
counterPair,
reqResMatcher,
),
)
stream.addServer(
NewTcpReader(
make(chan api.TcpReaderDataMsg),
&api.ReadProgress{},
fmt.Sprintf("%s %s", net, transport),
&api.TcpID{
SrcIP: net.Dst().String(),
DstIP: net.Src().String(),
SrcPort: transport.Dst().String(),
DstPort: transport.Src().String(),
},
time.Time{},
stream,
false,
props.isOutgoing,
extension,
factory.emitter,
counterPair,
reqResMatcher,
),
)
stream.addCounterPair(counterPair)
factory.streamsMap.Store(stream.getId(), stream)
factory.wg.Add(2)
go stream.getClient(i).run(filteringOptions, &factory.wg)
go stream.getServer(i).run(filteringOptions, &factory.wg)
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
stream.addReqResMatcher(reqResMatcher)
}
stream.client = NewTcpReader(
fmt.Sprintf("%s %s", net, transport),
&api.TcpID{
SrcIP: srcIp,
DstIP: dstIp,
SrcPort: srcPort,
DstPort: dstPort,
},
stream,
true,
props.isOutgoing,
factory.emitter,
)
stream.server = NewTcpReader(
fmt.Sprintf("%s %s", net, transport),
&api.TcpID{
SrcIP: net.Dst().String(),
DstIP: net.Src().String(),
SrcPort: transport.Dst().String(),
DstPort: transport.Src().String(),
},
stream,
false,
props.isOutgoing,
factory.emitter,
)
factory.streamsMap.Store(stream.getId(), stream)
factory.wg.Add(2)
go stream.client.run(filteringOptions, &factory.wg)
go stream.server.run(filteringOptions, &factory.wg)
}
return reassemblyStream
}

View File

@@ -57,7 +57,7 @@ func (streamMap *tcpStreamMap) CloseTimedoutTcpStreamChannels() {
return true
}
if stream.protoIdentifier.Protocol == nil {
if stream.protocol == nil {
if !stream.isClosed && time.Now().After(stream.createdAt.Add(tcpStreamChannelTimeoutMs)) {
stream.close()
diagnose.AppStats.IncDroppedTcpStreams()

View File

@@ -68,7 +68,7 @@ struct fd_info {
BPF_HASH(pids_map, __u32, __u32);
BPF_LRU_HASH(ssl_write_context, __u64, struct ssl_info);
BPF_LRU_HASH(ssl_read_context, __u64, struct ssl_info);
BPF_HASH(file_descriptor_to_ipv4, __u64, struct fd_info);
BPF_LRU_HASH(file_descriptor_to_ipv4, __u64, struct fd_info);
BPF_PERF_OUTPUT(chunks_buffer);
BPF_PERF_OUTPUT(log_buffer);

View File

@@ -188,8 +188,7 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, address *addressPair, key
}
stream := &tlsStream{
reader: reader,
protoIdentifier: &api.ProtoIdentifier{},
reader: reader,
}
streamsMap.Store(streamsMap.NextId(), stream)

View File

@@ -94,7 +94,3 @@ func (r *tlsReader) GetEmitter() api.Emitter {
func (r *tlsReader) GetIsClosed() bool {
return false
}
func (r *tlsReader) GetExtension() *api.Extension {
return r.extension
}

View File

@@ -3,20 +3,20 @@ package tlstapper
import "github.com/up9inc/mizu/tap/api"
type tlsStream struct {
reader *tlsReader
protoIdentifier *api.ProtoIdentifier
reader *tlsReader
protocol *api.Protocol
}
func (t *tlsStream) GetOrigin() api.Capture {
return api.Ebpf
}
func (t *tlsStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
func (t *tlsStream) GetProtocol() *api.Protocol {
return t.protocol
}
func (t *tlsStream) SetProtocol(protocol *api.Protocol) {
t.protoIdentifier.Protocol = protocol
t.protocol = protocol
}
func (t *tlsStream) GetReqResMatchers() []api.RequestResponseMatcher {

Binary file not shown.

Binary file not shown.

View File

@@ -67,7 +67,6 @@
height: 100%
display: flex
flex-direction: column
margin-right: 10px
width: 100%
border-radius: 4px
@@ -82,17 +81,18 @@
margin-top: 10px
margin-right: 10px
.protocolsFilterList, .servicesFilter
.card
background: white
padding: 10px
border-radius: 4px
user-select: none
box-shadow: 0px 1px 5px #979797
.servicesFilter
margin-top: 10px
.servicesFilterWrapper
margin-top: 20px
margin-bottom: 3px
height: 100%
overflow: hidden
border-radius: 4px
& .servicesFilterList
height: calc(100% - 30px - 52px)
.servicesFilterList
height: calc(100% - 30px - 52px)

View File

@@ -158,6 +158,7 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onClos
if (checkedProtocols.length === 0) {
setCheckedProtocols(getProtocolsForFilter.map(x => x.key))
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [getProtocolsForFilter])
useEffect(() => {
@@ -217,13 +218,13 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onClos
<div className={styles.filterSection + ` ${isFilterClicked ? styles.show : ""}`}>
<Resizeable minWidth={170} maxWidth={320}>
<div className={styles.filterWrapper}>
<div className={styles.protocolsFilterList}>
<div className={styles.card}>
<SelectList items={getProtocolsForFilter} checkBoxWidth="5%" tableName={"PROTOCOLS"} multiSelect={true}
checkedValues={checkedProtocols} setCheckedValues={onProtocolsChange} tableClassName={styles.filters}
inputSearchClass={styles.servicesFilterSearch} isFilterable={false}/>
</div>
<div className={styles.servicesFilter}>
<div className={styles.servicesFilterList}>
<div className={styles.servicesFilterWrapper + ` ${styles.card}`}>
<div className={styles.servicesFilterList}>
<SelectList items={getServicesForFilter} tableName={"SERVICES"} tableClassName={styles.filters} multiSelect={true}
checkBoxWidth="5%" checkedValues={checkedServices} setCheckedValues={onServiceChanges} inputSearchClass={styles.servicesFilterSearch}/>
</div>

View File

@@ -85,7 +85,7 @@ const SelectList: React.FC<Props> = ({ items, tableName, checkedValues = [], mul
const tableBody = filteredValues.length === 0 ?
<tr>
<td colSpan={2}>
<td colSpan={2} className={styles.displayBlock}>
<NoDataMessage messageText={noItemsMessage} />
</td>
</tr>

View File

@@ -22,8 +22,8 @@ export const StatusBar: React.FC<StatusBarProps> = ({isDemoBannerView, disabled}
const {uniqueNamespaces, amountOfPods, amountOfTappedPods, amountOfUntappedPods} = useRecoilValue(tappingStatusDetails);
return <div style={{opacity: disabled ? 0.4 : 1}} className={`${isDemoBannerView ? `${style.banner}` : ''} ${style.statusBar} ${(expandedBar && !disabled ? `${style.expandedStatusBar}` : "")}`} onMouseOver={() => setExpandedBar(true)} onMouseLeave={() => setExpandedBar(false)} data-cy="expandedStatusBar">
<div className={style.podsCount}>
{tappingStatus.some(pod => !pod.isTapped) && <img src={warningIcon} alt="warning"/>}
{disabled && <Tooltip title={"Tapping status is not updated when streaming is paused"} isSimple><img src={warningIcon} alt="warning"/></Tooltip>}
{!disabled && tappingStatus.some(pod => !pod.isTapped) && <img src={warningIcon} alt="warning"/>}
{disabled && <Tooltip title={"Tapping status is not updated when streaming is paused"} isSimple><img src={warningIcon} alt="warning"/></Tooltip>}
<span className={style.podsCountText} data-cy="podsCountText">
{`Tapping ${amountOfUntappedPods > 0 ? amountOfTappedPods + " / " + amountOfPods : amountOfPods} ${pluralize('pod', amountOfPods)} in ${pluralize('namespace', uniqueNamespaces.length)} ${uniqueNamespaces.join(", ")}`}
</span>

View File

@@ -1,44 +1,70 @@
@import '../../../variables.module'
@import '../../../components'
.selectListTable
overflow: auto
height: 100%
user-select: none // when resizble moved we get unwanted beheviour
height: 100%
table
width: 100%
margin-top: 20px
border-collapse: collapse
table-layout: fixed
height: 100%
display: flex
flex-flow: column
height: 100%
th
color: $blue-gray
text-align: left
padding: 10px
position: sticky
top: 0
background: $main-background-color
font-size: 12px
thead
display: table
table-layout: fixed
flex: 0 0 auto
width: calc(100% - 0.9em)
tr
border-bottom-width: 1px
border-bottom-color: $data-background-color
border-bottom-style: solid
width: 100%
tbody
display: block
overflow: auto
width: 100%
height: 100%
flex: 1 1 auto
td
color: $light-gray
padding: 10px
font-size: 11px
font-weight: 600
padding-top: 5px
padding-bottom: 5px
tbody tr:hover
background: $header-background-color
th
color: $blue-gray
text-align: left
padding: 10px
background: $main-background-color
font-size: 12px
tr
border-bottom-width: 1px
border-bottom-color: $data-background-color
border-bottom-style: solid
width: 100%
display: block
position: relative
display: table
table-layout: fixed
td
color: $light-gray
padding: 10px
font-size: 11px
font-weight: 600
padding-top: 5px
padding-bottom: 5px
.nowrap
white-space: nowrap
.totalSelected
font-size: 12px
color: $light-blue-color
font-weight: 700
font-size: 12px
color: $light-blue-color
font-weight: 700
.displayBlock
display: block
.filterInput
margin-bottom: 20px