Compare commits

...

3 Commits

Author SHA1 Message Date
RamiBerm
21902b5f86 Fix tapping status falling out of sync (#898)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-03-21 14:54:25 +02:00
gadotroee
a4d0e250c9 Fixing acceptance tests (#900) 2022-03-21 14:21:08 +02:00
M. Mert Yıldıran
5455220a3a Add an indicator for the eBPF sourced entries (#886)
* Define `Capture` type and expect it as an argument in `Dissect` method

* Add an indicator for the  eBPF sourced entries

* Fix the Go lint error

* Fix the logic in the UI

* Update the expected JSONs

* Add TODOs

* Add `UndefinedCapture` constant

* Define `CaptureTypes` enum
2022-03-17 09:32:09 +03:00
40 changed files with 244 additions and 198 deletions

View File

@@ -1,8 +1,7 @@
const columns = {podName : 1, namespace : 2, tapping : 3};
const greenStatusImageSrc = '/static/media/success.662997eb.svg';
function getDomPathInStatusBar(line, column) {
return `.expandedStatusBar > :nth-child(2) > > :nth-child(2) > :nth-child(${line}) > :nth-child(${column})`;
return `[data-cy="expandedStatusBar"] > :nth-child(2) > > :nth-child(2) > :nth-child(${line}) > :nth-child(${column})`;
}
export function checkLine(line, expectedValues) {
@@ -12,14 +11,14 @@ export function checkLine(line, expectedValues) {
cy.get(getDomPathInStatusBar(line, columns.namespace)).invoke('text').then(namespaceValue => {
expect(namespaceValue).to.equal(expectedValues.namespace);
cy.get(getDomPathInStatusBar(line, columns.tapping)).children().should('have.attr', 'src', greenStatusImageSrc);
cy.get(getDomPathInStatusBar(line, columns.tapping)).children().should('have.attr', 'src').and("match", /success.*\.svg/);
});
});
}
export function findLineAndCheck(expectedValues) {
cy.get('.expandedStatusBar > :nth-child(2) > > :nth-child(2) > > :nth-child(1)').then(pods => {
cy.get('.expandedStatusBar > :nth-child(2) > > :nth-child(2) > > :nth-child(2)').then(namespaces => {
cy.get('[data-cy="expandedStatusBar"] > :nth-child(2) > > :nth-child(2) > > :nth-child(1)').then(pods => {
cy.get('[data-cy="expandedStatusBar"] > :nth-child(2) > > :nth-child(2) > > :nth-child(2)').then(namespaces => {
// organizing namespaces array
const podObjectsArray = Object.values(pods ?? {});
const namespacesObjectsArray = Object.values(namespaces ?? {});

View File

@@ -45,7 +45,7 @@ export function leftTextCheck(entryNum, path, expectedText) {
export function leftOnHoverCheck(entryNum, path, filterName) {
cy.get(`#list #entry-${entryNum} ${path}`).trigger('mouseover');
cy.get(`#list #entry-${entryNum} .Queryable-Tooltip`).invoke('text').should('match', new RegExp(filterName));
cy.get(`#list #entry-${entryNum} [data-cy='QueryableTooltip']`).invoke('text').should('match', new RegExp(filterName));
}
export function rightTextCheck(path, expectedText) {
@@ -54,7 +54,7 @@ export function rightTextCheck(path, expectedText) {
export function rightOnHoverCheck(path, expectedText) {
cy.get(`#rightSideContainer ${path}`).trigger('mouseover');
cy.get(`#rightSideContainer .Queryable-Tooltip`).invoke('text').should('match', new RegExp(expectedText));
cy.get(`#rightSideContainer [data-cy='QueryableTooltip']`).invoke('text').should('match', new RegExp(expectedText));
}
export function checkThatAllEntriesShown() {

View File

@@ -8,6 +8,6 @@ it('check', function () {
cy.visit(`http://localhost:${port}`);
cy.wait('@statusTap').its('response.statusCode').should('match', /^2\d{2}/);
cy.get('.podsCount').trigger('mouseover');
cy.get(`[data-cy="expandedStatusBar"]`).trigger('mouseover',{force: true});
findLineAndCheck(getExpectedDetailsDict(podName, namespace));
});

View File

@@ -2,7 +2,7 @@ import {findLineAndCheck, getExpectedDetailsDict} from '../testHelpers/StatusBar
it('opening', function () {
cy.visit(Cypress.env('testUrl'));
cy.get('.podsCount').trigger('mouseover');
cy.get(`[data-cy="podsCountText"]`).trigger('mouseover');
});
[1, 2, 3].map(doItFunc);

View File

@@ -3,9 +3,9 @@ import {getExpectedDetailsDict, checkLine} from '../testHelpers/StatusBarHelper'
it('opening', function () {
cy.visit(Cypress.env('testUrl'));
cy.get('.podsCount').trigger('mouseover');
cy.get(`[data-cy="podsCountText"]`).trigger('mouseover');
cy.get('.expandedStatusBar > :nth-child(2) > > :nth-child(2) >').should('have.length', 1); // one line
cy.get('[data-cy="expandedStatusBar"] > :nth-child(2) > > :nth-child(2) >').should('have.length', 1); // one line
checkLine(1, getExpectedDetailsDict(Cypress.env('name'), Cypress.env('namespace')));
});

View File

@@ -26,7 +26,7 @@ it('opening mizu', function () {
verifyMinimumEntries();
it('top bar check', function () {
cy.get('.podsCount').trigger('mouseover');
cy.get(`[data-cy="podsCountText"]`).trigger('mouseover');
podsArray.map(findLineAndCheck);
cy.reload();
});
@@ -205,6 +205,7 @@ function checkFilter(filterDetails){
// checks the hover on the last entry (the only one in DOM at the beginning)
leftOnHoverCheck(totalEntries - 1, leftSidePath, name);
cy.get('.w-tc-editor-text').clear();
// applying the filter with alt+enter or with the button
cy.get('.w-tc-editor-text').type(`${name}${applyByEnter ? '{alt+enter}' : ''}`);
cy.get('.w-tc-editor').should('have.attr', 'style').and('include', Cypress.env('greenFilterColor'));

View File

@@ -94,10 +94,6 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperPods() {
continue
}
if tapperSyncer.startTime.After(pod.CreationTimestamp.Time) {
continue
}
logger.Log.Debugf("Watching tapper pods loop, tapper: %v, node: %v, status: %v", pod.Name, pod.Spec.NodeName, pod.Status.Phase)
if pod.Spec.NodeName != "" {
tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)}
@@ -137,10 +133,6 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() {
continue
}
if tapperSyncer.startTime.After(event.CreationTimestamp.Time) {
continue
}
logger.Log.Debugf(
fmt.Sprintf("Watching tapper events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s",
event.Name,

View File

@@ -48,6 +48,16 @@ type Extension struct {
Dissector Dissector
}
type Capture string
const (
UndefinedCapture Capture = ""
Pcap Capture = "pcap"
Envoy Capture = "envoy"
Linkerd Capture = "linkerd"
Ebpf Capture = "ebpf"
)
type ConnectionInfo struct {
ClientIP string
ClientPort string
@@ -84,6 +94,7 @@ type RequestResponsePair struct {
// `Protocol` is modified in the later stages of data propagation. Therefore it's not a pointer.
type OutputChannelItem struct {
Protocol Protocol
Capture Capture
Timestamp int64
ConnectionInfo *ConnectionInfo
Pair *RequestResponsePair
@@ -102,7 +113,7 @@ type SuperIdentifier struct {
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Dissect(b *bufio.Reader, capture Capture, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Summarize(entry *Entry) *BaseEntry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
@@ -132,6 +143,7 @@ func (e *Emitting) Emit(item *OutputChannelItem) {
type Entry struct {
Id uint `json:"id"`
Protocol Protocol `json:"proto"`
Capture Capture `json:"capture"`
Source *TCP `json:"src"`
Destination *TCP `json:"dst"`
Namespace string `json:"namespace,omitempty"`
@@ -162,6 +174,7 @@ type EntryWrapper struct {
type BaseEntry struct {
Id uint `json:"id"`
Protocol Protocol `json:"proto,omitempty"`
Capture Capture `json:"capture"`
Summary string `json:"summary,omitempty"`
SummaryQuery string `json:"summaryQuery,omitempty"`
Status int `json:"status"`

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/amqp/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/amqp/\* expect

View File

@@ -94,7 +94,7 @@ type AMQPWrapper struct {
Details interface{} `json:"details"`
}
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter) {
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter, capture api.Capture) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,
@@ -108,6 +108,7 @@ func emitAMQP(event interface{}, _type string, method string, connectionInfo *ap
}
item := &api.OutputChannelItem{
Protocol: protocol,
Capture: capture,
Timestamp: captureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{

View File

@@ -39,7 +39,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
r := AmqpReader{b}
var remaining int
@@ -113,11 +113,11 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
case *BasicPublish:
eventBasicPublish.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter, capture)
}
case *MethodFrame:
@@ -138,7 +138,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *BasicConsume:
eventBasicConsume := &BasicConsume{
@@ -151,7 +151,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag
@@ -171,7 +171,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{
@@ -185,7 +185,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *ConnectionStart:
eventConnectionStart := &ConnectionStart{
@@ -196,7 +196,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Locales: m.Locales,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *ConnectionClose:
eventConnectionClose := &ConnectionClose{
@@ -206,7 +206,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
MethodId: m.MethodId,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter, capture)
}
default:
@@ -222,6 +222,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
reqDetails["method"] = request["method"]
return &api.Entry{
Protocol: protocol,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@@ -283,6 +284,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: 0,

View File

@@ -122,7 +122,7 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -140,7 +140,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/http/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/http/\* expect

View File

@@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
item.ConnectionInfo.ClientPort = ""
}
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
func handleHTTP2Stream(http2Assembler *Http2Assembler, capture api.Capture, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil {
return err
@@ -104,13 +104,14 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
} else {
item.Protocol = http2Protocol
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
func handleHTTP1ClientStream(b *bufio.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b)
if err != nil {
return
@@ -147,12 +148,13 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
return
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
func handleHTTP1ServerStream(b *bufio.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response
res, err = http.ReadResponse(b, nil)
if err != nil {
@@ -190,6 +192,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
ServerPort: tcpID.SrcPort,
IsOutgoing: false,
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
return

View File

@@ -86,7 +86,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
var err error
@@ -121,7 +121,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options, reqResMatcher)
err = handleHTTP2Stream(http2Assembler, capture, tcpID, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -130,7 +130,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
superIdentifier.Protocol = &http11protocol
} else if isClient {
var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -157,11 +157,12 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
}
} else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -259,6 +260,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
httpPair, _ := json.Marshal(item.Pair)
return &api.Entry{
Protocol: item.Protocol,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@@ -291,6 +293,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: status,

View File

@@ -124,7 +124,7 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -142,7 +142,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/kafka/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/kafka/\* expect

View File

@@ -35,7 +35,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
@@ -49,7 +49,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
superIdentifier.Protocol = &_protocol
} else {
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter, reqResMatcher)
err := ReadResponse(b, capture, tcpID, counterPair, superTimer, emitter, reqResMatcher)
if err != nil {
return err
}
@@ -68,6 +68,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
return &api.Entry{
Protocol: _protocol,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@@ -190,6 +191,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: status,

View File

@@ -123,7 +123,7 @@ func TestDissect(t *testing.T) {
}
reqResMatcher := dissector.NewResponseRequestMatcher()
reqResMatcher.SetMaxTry(10)
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
@@ -141,7 +141,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}

View File

@@ -16,7 +16,7 @@ type Response struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -258,6 +258,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s
item := &api.OutputChannelItem{
Protocol: _protocol,
Capture: capture,
Timestamp: reqResPair.Request.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/redis/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/redis/\* expect

View File

@@ -6,7 +6,7 @@ import (
"github.com/up9inc/mizu/tap/api"
)
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
func handleClientStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
@@ -23,6 +23,7 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
if item != nil {
item.Capture = capture
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
@@ -35,7 +36,7 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
return nil
}
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
func handleServerStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
@@ -52,6 +53,7 @@ func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
if item != nil {
item.Capture = capture
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,
ClientPort: tcpID.DstPort,

View File

@@ -34,7 +34,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
is := &RedisInputStream{
Reader: b,
@@ -48,9 +48,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isClient {
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
err = handleClientStream(capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
} else {
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
err = handleServerStream(capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
}
if err != nil {
@@ -71,6 +71,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
return &api.Entry{
Protocol: protocol,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@@ -113,6 +114,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: status,

View File

@@ -123,7 +123,7 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
@@ -141,7 +141,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}

View File

@@ -95,7 +95,8 @@ func (h *tcpReader) Close() {
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
// TODO: Add api.Pcap, api.Envoy and api.Linkerd distinction by refactoring NewPacketSourceManager method
err := h.extension.Dissector.Dissect(b, api.Pcap, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
if err != nil {
_, err = io.Copy(ioutil.Discard, b)
if err != nil {

View File

@@ -158,7 +158,7 @@ func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid
emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) {
b := bufio.NewReader(reader)
err := extension.Dissector.Dissect(b, isRequest, tcpid, &api.CounterPair{},
err := extension.Dissector.Dissect(b, api.Ebpf, isRequest, tcpid, &api.CounterPair{},
&api.SuperTimer{}, &api.SuperIdentifier{}, emitter, options, reqResMatcher)
if err != nil {

View File

@@ -5,10 +5,10 @@ import Api, {getWebsocketUrl} from "./api";
const api = Api.getInstance()
const App = () => {
const {message,error,isOpen, openSocket, closeSocket, sendQuery} = useWS(getWebsocketUrl())
const trafficViewerApi = {...api, webSocket:{open : openSocket, close: closeSocket, sendQuery: sendQuery}}
sendQuery(DEFAULT_QUERY);
const App = () => {
const {message,error,isOpen, openSocket, closeSocket, sendQueryWhenWsOpen} = useWS(getWebsocketUrl())
const trafficViewerApi = {...api, webSocket:{open : openSocket, close: closeSocket, sendQueryWhenWsOpen: sendQueryWhenWsOpen}}
sendQueryWhenWsOpen(DEFAULT_QUERY);
useEffect(() => {
return () =>{

View File

@@ -1,6 +1,6 @@
{
"name": "@up9/mizu-common",
"version": "1.0.125",
"version": "1.0.128",
"description": "Made with create-react-library",
"author": "",
"license": "MIT",
@@ -30,7 +30,6 @@
"react":"^17.0.2",
"react-dom": "^17.0.2",
"recoil": "^0.5.2",
"react-copy-to-clipboard": "^5.0.3",
"@types/jest": "^26.0.22",
"@types/node": "^12.20.10"

View File

@@ -62,6 +62,13 @@
width: 185px
text-align: left
.capture
margin-top: -60px
.capture img
height: 20px
z-index: 1000
.endpointServiceContainer
display: flex
flex-direction: column

View File

@@ -4,6 +4,7 @@ import SwapHorizIcon from '@material-ui/icons/SwapHoriz';
import styles from './EntryListItem.module.sass';
import StatusCode, {getClassification, StatusCodeClassification} from "../../UI/StatusCode";
import Protocol, {ProtocolInterface} from "../../UI/Protocol"
import eBPFLogo from '../assets/ebpf.png';
import {Summary} from "../../UI/Summary";
import Queryable from "../../UI/Queryable";
import ingoingIconSuccess from "assets/ingoing-traffic-success.svg"
@@ -24,6 +25,7 @@ interface TCPInterface {
interface Entry {
proto: ProtocolInterface,
capture: string,
method?: string,
methodQuery?: string,
summary: string,
@@ -52,6 +54,14 @@ interface EntryProps {
headingMode: boolean;
}
enum CaptureTypes {
UndefinedCapture = "",
Pcap = "pcap",
Envoy = "envoy",
Linkerd = "linkerd",
Ebpf = "ebpf",
}
export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) => {
const [focusedEntryId, setFocusedEntryId] = useRecoilState(focusedEntryIdAtom);
@@ -154,6 +164,17 @@ export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) =>
protocol={entry.proto}
horizontal={false}
/> : null}
{/* TODO: Update the code below once we have api.Pcap, api.Envoy and api.Linkerd distinction in the backend */}
{entry.capture === CaptureTypes.Ebpf ? <div className={styles.capture}>
<Queryable
query={`capture == "${entry.capture}"`}
displayIconOnMouseOver={true}
flipped={false}
style={{position: "absolute"}}
>
<img src={eBPFLogo} alt="eBPF"/>
</Queryable>
</div> : null}
{isStatusCodeEnabled && <div>
<StatusCode statusCode={entry.status} statusQuery={entry.statusQuery}/>
</div>}

View File

@@ -8,7 +8,8 @@ import { EntryDetailed } from "./EntryDetailed";
import playIcon from 'assets/run.svg';
import pauseIcon from 'assets/pause.svg';
import variables from '../../variables.module.scss';
import { toast } from 'react-toastify';
import { toast,ToastContainer } from 'react-toastify';
import 'react-toastify/dist/ReactToastify.css';
import debounce from 'lodash/debounce';
import { RecoilRoot, RecoilState, useRecoilState, useRecoilValue, useSetRecoilState } from "recoil";
import entriesAtom from "../../recoil/entries";
@@ -21,7 +22,6 @@ import TrafficViewerApi from "./TrafficViewerApi";
import { StatusBar } from "../UI/StatusBar";
import tappingStatusAtom from "../../recoil/tappingStatus/atom";
const useLayoutStyles = makeStyles(() => ({
details: {
flex: "0 0 50%",
@@ -45,26 +45,22 @@ const useLayoutStyles = makeStyles(() => ({
interface TrafficViewerProps {
setAnalyzeStatus?: (status: any) => void;
api?: any
message?: {}
error?: {}
isWebSocketOpen: boolean
trafficViewerApiProp: TrafficViewerApi,
actionButtons?: JSX.Element,
isShowStatusBar?: boolean
isShowStatusBar?: boolean,
webSocketUrl : string
}
const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message, error, isWebSocketOpen, trafficViewerApiProp, actionButtons, isShowStatusBar }) => {
export const TrafficViewer : React.FC<TrafficViewerProps> = ({setAnalyzeStatus, trafficViewerApiProp, actionButtons,isShowStatusBar,webSocketUrl}) => {
const classes = useLayoutStyles();
const [entries, setEntries] = useRecoilState(entriesAtom);
const [focusedEntryId, setFocusedEntryId] = useRecoilState(focusedEntryIdAtom);
const [wsConnection, setWsConnection] = useRecoilState(websocketConnectionAtom);
const query = useRecoilValue(queryAtom);
const [queryToSend, setQueryToSend] = useState("")
const setTrafficViewerApiState = useSetRecoilState(trafficViewerApiAtom as RecoilState<TrafficViewerApi>)
const [tappingStatus, setTappingStatus] = useRecoilState(tappingStatusAtom);
const [noMoreDataTop, setNoMoreDataTop] = useState(false);
const [isSnappedToBottom, setIsSnappedToBottom] = useState(true);
@@ -107,6 +103,7 @@ const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message
handleQueryChange(query);
}, [query, handleQueryChange]);
const ws = useRef(null);
const listEntry = useRef(null);
const openWebSocket = (query: string, resetEntries: boolean) => {
@@ -117,98 +114,99 @@ const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message
setLeftOffTop(null);
setNoMoreDataTop(false);
}
setQueryToSend(query)
trafficViewerApiProp.webSocket.open();
}
const onmessage = useCallback((e) => {
if (!e?.data) return;
const message = JSON.parse(e.data);
switch (message.messageType) {
case "entry":
const entry = message.data;
if (!focusedEntryId) setFocusedEntryId(entry.id.toString());
const newEntries = [...entries, entry];
if (newEntries.length === 10001) {
setLeftOffTop(newEntries[0].entry.id);
newEntries.shift();
setNoMoreDataTop(false);
}
setEntries(newEntries);
break;
case "status":
setTappingStatus(message.tappingStatus);
break;
case "analyzeStatus":
setAnalyzeStatus(message.analyzeStatus);
break;
case "outboundLink":
onTLSDetected(message.Data.DstIP);
break;
case "toast":
toast[message.data.type](message.data.text, {
position: "bottom-right",
theme: "colored",
autoClose: message.data.autoClose,
hideProgressBar: false,
closeOnClick: true,
pauseOnHover: true,
draggable: true,
progress: undefined,
});
break;
case "queryMetadata":
setQueriedCurrent(queriedCurrent + message.data.current);
setQueriedTotal(message.data.total);
setLeftOffBottom(message.data.leftOff);
setTruncatedTimestamp(message.data.truncatedTimestamp);
if (leftOffTop === null) {
setLeftOffTop(message.data.leftOff - 1);
}
break;
case "startTime":
setStartTime(message.data);
break;
default:
console.error(
`unsupported websocket message type, Got: ${message.messageType}`
);
ws.current = new WebSocket(webSocketUrl);
ws.current.onopen = () => {
setWsConnection(WsConnectionStatus.Connected);
sendQueryWhenWsOpen(query);
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [message]);
useEffect(() => {
onmessage(message)
}, [message, onmessage])
useEffect(() => {
onerror(error)
}, [error])
useEffect(() => {
isWebSocketOpen ? setWsConnection(WsConnectionStatus.Connected) : setWsConnection(WsConnectionStatus.Closed)
trafficViewerApiProp.webSocket.sendQuery(queryToSend)
}, [isWebSocketOpen, queryToSend, setWsConnection])
const onerror = (event) => {
console.error("WebSocket error:", event);
if (query) {
openWebSocket(`(${query}) and leftOff(${leftOffBottom})`, false);
} else {
openWebSocket(`leftOff(${leftOffBottom})`, false);
ws.current.onclose = () => {
setWsConnection(WsConnectionStatus.Closed);
}
ws.current.onerror = (event) => {
console.error("WebSocket error:", event);
if (query) {
openWebSocket(`(${query}) and leftOff(${leftOffBottom})`, false);
} else {
openWebSocket(`leftOff(${leftOffBottom})`, false);
}
}
}
const sendQueryWhenWsOpen = (query) => {
setTimeout(() => {
if (ws?.current?.readyState === WebSocket.OPEN) {
ws.current.send(JSON.stringify({"query": query, "enableFullEntries": false}));
} else {
sendQueryWhenWsOpen(query);
}
}, 500)
}
if (ws.current) {
ws.current.onmessage = (e) => {
if (!e?.data) return;
const message = JSON.parse(e.data);
switch (message.messageType) {
case "entry":
const entry = message.data;
if (!focusedEntryId) setFocusedEntryId(entry.id.toString());
const newEntries = [...entries, entry];
if (newEntries.length === 10001) {
setLeftOffTop(newEntries[0].entry.id);
newEntries.shift();
setNoMoreDataTop(false);
}
setEntries(newEntries);
break;
case "status":
setTappingStatus(message.tappingStatus);
break;
case "analyzeStatus":
setAnalyzeStatus(message.analyzeStatus);
break;
case "outboundLink":
onTLSDetected(message.Data.DstIP);
break;
case "toast":
toast[message.data.type](message.data.text, {
position: "bottom-right",
theme: "colored",
autoClose: message.data.autoClose,
hideProgressBar: false,
closeOnClick: true,
pauseOnHover: true,
draggable: true,
progress: undefined,
});
break;
case "queryMetadata":
setQueriedCurrent(queriedCurrent + message.data.current);
setQueriedTotal(message.data.total);
setLeftOffBottom(message.data.leftOff);
setTruncatedTimestamp(message.data.truncatedTimestamp);
if (leftOffTop === null) {
setLeftOffTop(message.data.leftOff - 1);
}
break;
case "startTime":
setStartTime(message.data);
break;
default:
console.error(
`unsupported websocket message type, Got: ${message.messageType}`
);
}
};
}
useEffect(() => {
setTrafficViewerApiState({...trafficViewerApiProp, webSocket : {close : () => ws.current.close()}});
(async () => {
setTrafficViewerApiState(trafficViewerApiProp)
openWebSocket("leftOff(-1)", true);
try {
try{
const tapStatusResponse = await trafficViewerApiProp.tapStatus();
setTappingStatus(tapStatusResponse);
if (setAnalyzeStatus) {
if(setAnalyzeStatus) {
const analyzeStatusResponse = await trafficViewerApiProp.analyzeStatus();
setAnalyzeStatus(analyzeStatusResponse);
}
@@ -220,8 +218,8 @@ const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message
}, []);
const toggleConnection = () => {
if (wsConnection === WsConnectionStatus.Closed) {
ws.current.close();
if (wsConnection !== WsConnectionStatus.Connected) {
if (query) {
openWebSocket(`(${query}) and leftOff(-1)`, true);
} else {
@@ -230,10 +228,6 @@ const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message
scrollableRef.current.jumpToBottom();
setIsSnappedToBottom(true);
}
else if (wsConnection === WsConnectionStatus.Connected) {
trafficViewerApiProp.webSocket.close()
setWsConnection(WsConnectionStatus.Closed);
}
}
const onTLSDetected = (destAddress: string) => {
@@ -270,7 +264,7 @@ const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message
const onSnapBrokenEvent = () => {
setIsSnappedToBottom(false);
if (wsConnection === WsConnectionStatus.Connected) {
trafficViewerApiProp.webSocket.close()
ws.current.close();
}
}
@@ -330,16 +324,17 @@ const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message
setAddressesWithTLS={setAddressesWithTLS}
userDismissedTLSWarning={userDismissedTLSWarning}
setUserDismissedTLSWarning={setUserDismissedTLSWarning} />
<ToastContainer/>
</div>
);
};
const MemoiedTrafficViewer = React.memo(TrafficViewer)
const TrafficViewerContainer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message, isWebSocketOpen, trafficViewerApiProp, actionButtons, isShowStatusBar = true }) => {
const TrafficViewerContainer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, trafficViewerApiProp, actionButtons, isShowStatusBar = true ,webSocketUrl}) => {
return <RecoilRoot>
<MemoiedTrafficViewer message={message} isWebSocketOpen={isWebSocketOpen} actionButtons={actionButtons} isShowStatusBar={isShowStatusBar}
<MemoiedTrafficViewer actionButtons={actionButtons} isShowStatusBar={isShowStatusBar} webSocketUrl={webSocketUrl}
trafficViewerApiProp={trafficViewerApiProp} setAnalyzeStatus={setAnalyzeStatus} />
</RecoilRoot>
}
export default TrafficViewerContainer
export default TrafficViewerContainer

View File

@@ -6,10 +6,8 @@ type TrafficViewerApi = {
getEntry : (entryId : any, query:string) => any
getRecentTLSLinks : () => any,
webSocket : {
open : () => {},
close : () => {},
sendQuery : (query:string) => {}
close : () => {}
}
}
export default TrafficViewerApi
export default TrafficViewerApi

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

View File

@@ -46,6 +46,7 @@ const Protocol: React.FC<ProtocolProps> = ({protocol, horizontal}) => {
displayIconOnMouseOver={true}
flipped={false}
iconStyle={{marginTop: "52px", marginRight: "10px", zIndex: 1000}}
tooltipStyle={{marginTop: "-22px", zIndex: 1001}}
>
<span
className={`${styles.base} ${styles.vertical}`}

View File

@@ -11,11 +11,12 @@ interface Props {
iconStyle?: object,
className?: string,
useTooltip?: boolean,
tooltipStyle?: object,
displayIconOnMouseOver?: boolean,
flipped?: boolean,
}
const Queryable: React.FC<Props> = ({query, style, iconStyle, className, useTooltip= true, displayIconOnMouseOver = false, flipped = false, children}) => {
const Queryable: React.FC<Props> = ({query, style, iconStyle, className, useTooltip = true, tooltipStyle = null, displayIconOnMouseOver = false, flipped = false, children}) => {
const [showAddedNotification, setAdded] = useState(false);
const [showTooltip, setShowTooltip] = useState(false);
const [queryState, setQuery] = useRecoilState(queryAtom);
@@ -48,12 +49,12 @@ const Queryable: React.FC<Props> = ({query, style, iconStyle, className, useTool
</CopyToClipboard> : null;
return (
<div className={`${QueryableStyle.QueryableContainer} ${QueryableStyle.displayIconOnMouseOver} ${className ? className : ''} ${displayIconOnMouseOver ? QueryableStyle.displayIconOnMouseOver : ''}`}
<div className={`${QueryableStyle.QueryableContainer} ${QueryableStyle.displayIconOnMouseOver} ${className ? className : ''} ${displayIconOnMouseOver ? QueryableStyle.displayIconOnMouseOver : ''}`}
style={style} onMouseOver={ e => setShowTooltip(true)} onMouseLeave={ e => setShowTooltip(false)}>
{flipped && addButton}
{children}
{!flipped && addButton}
{useTooltip && showTooltip && <span className={QueryableStyle.QueryableTooltip}>{query}</span>}
{useTooltip && showTooltip && <span data-cy={"QueryableTooltip"} className={QueryableStyle.QueryableTooltip} style={tooltipStyle}>{query}</span>}
</div>
);
};

View File

@@ -11,15 +11,14 @@ const pluralize = (noun: string, amount: number) => {
}
export const StatusBar = () => {
const tappingStatus = useRecoilValue(tappingStatusAtom);
const [expandedBar, setExpandedBar] = useState(false);
const {uniqueNamespaces, amountOfPods, amountOfTappedPods, amountOfUntappedPods} = useRecoilValue(tappingStatusDetails);
return <div className={`${style.statusBar} ${(expandedBar ? `${style.expandedStatusBar}` : "")}`} onMouseOver={() => setExpandedBar(true)} onMouseLeave={() => setExpandedBar(false)}>
return <div className={`${style.statusBar} ${(expandedBar ? `${style.expandedStatusBar}` : "")}`} onMouseOver={() => setExpandedBar(true)} onMouseLeave={() => setExpandedBar(false)} data-cy="expandedStatusBar">
<div className={style.podsCount}>
{tappingStatus.some(pod => !pod.isTapped) && <img src={warningIcon} alt="warning"/>}
<span className={style.podsCountText}>
<span className={style.podsCountText} data-cy="podsCountText">
{`Tapping ${amountOfUntappedPods > 0 ? amountOfTappedPods + " / " + amountOfPods : amountOfPods} ${pluralize('pod', amountOfPods)} in ${pluralize('namespace', uniqueNamespaces.length)} ${uniqueNamespaces.join(", ")}`}
</span>
</div>

View File

@@ -18,7 +18,9 @@ const useWS = (wsUrl: string) => {
const onMessage = (e) => { setMessage(e) }
const onError = (e) => setError(e)
const onOpen = () => { setisOpen(true) }
const onClose = () => setisOpen(false)
const onClose = () => {
setisOpen(false)
}
const openSocket = () => {
ws.current = new WebSocket(wsUrl)
@@ -36,13 +38,17 @@ const useWS = (wsUrl: string) => {
ws.current.removeEventListener("close", onClose)
}
const sendQuery = (query: string) => {
if (ws.current && (ws.current.readyState === WebSocketReadyState.OPEN)) {
ws.current.send(JSON.stringify({ "query": query, "enableFullEntries": false }));
}
const sendQueryWhenWsOpen = (query) => {
setTimeout(() => {
if (ws?.current?.readyState === WebSocket.OPEN) {
ws.current.send(JSON.stringify({"query": query, "enableFullEntries": false}));
} else {
sendQueryWhenWsOpen(query);
}
}, 500)
}
return { message, error, isOpen, openSocket, closeSocket, sendQuery }
return { message, error, isOpen, openSocket, closeSocket, sendQueryWhenWsOpen }
}
export default useWS
export default useWS

View File

@@ -13,7 +13,7 @@
"@types/jest": "^26.0.22",
"@types/node": "^12.20.10",
"@uiw/react-textarea-code-editor": "^1.4.12",
"@up9/mizu-common": "1.0.125",
"@up9/mizu-common": "^1.0.128",
"axios": "^0.25.0",
"core-js": "^3.20.2",
"craco-babel-loader": "^1.0.3",

View File

@@ -1,11 +1,11 @@
import React, { useEffect } from "react";
import React, {useEffect} from "react";
import { Button } from "@material-ui/core";
import Api, {getWebsocketUrl} from "../../../helpers/api";
import Api,{getWebsocketUrl} from "../../../helpers/api";
import debounce from 'lodash/debounce';
import {useSetRecoilState, useRecoilState} from "recoil";
import {useCommonStyles} from "../../../helpers/commonStyle"
import serviceMapModalOpenAtom from "../../../recoil/serviceMapModalOpen";
import TrafficViewer ,{useWS,DEFAULT_QUERY} from "@up9/mizu-common"
import TrafficViewer from "@up9/mizu-common"
import "@up9/mizu-common/dist/index.css"
import oasModalOpenAtom from "../../../recoil/oasModalOpen/atom";
import serviceMap from "../../assets/serviceMap.svg";
@@ -22,11 +22,10 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
const setServiceMapModalOpen = useSetRecoilState(serviceMapModalOpenAtom);
const [openOasModal, setOpenOasModal] = useRecoilState(oasModalOpenAtom);
const {message,error,isOpen, openSocket, closeSocket, sendQuery} = useWS(getWebsocketUrl())
const trafficViewerApi = {...api, webSocket:{open : openSocket, close: closeSocket, sendQuery: sendQuery}}
const trafficViewerApi = {...api}
const handleOpenOasModal = () => {
closeSocket()
//closeSocket() -- Todo: Add Close webSocket
setOpenOasModal(true);
}
@@ -56,18 +55,16 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
</Button>}
</div>
sendQuery(DEFAULT_QUERY);
useEffect(() => {
return () => {
closeSocket()
//closeSocket()
}
},[])
return (
<>
<TrafficViewer setAnalyzeStatus={setAnalyzeStatus} message={message} error={error} isWebSocketOpen={isOpen}
<TrafficViewer setAnalyzeStatus={setAnalyzeStatus} webSocketUrl={getWebsocketUrl()}
trafficViewerApiProp={trafficViewerApi} actionButtons={actionButtons} isShowStatusBar={!openOasModal}/>
</>
);
};
};

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB