Compare commits

...

22 Commits

Author SHA1 Message Date
Igor Gov
8f64fdaa61 Alert to slack if release action fails (#1117)
* Trigger mizu ent stg deployment on release
2022-05-29 15:16:45 +03:00
AmitUp9
7edb0b153b added condition that if no selector wouldn't be tooltip also (#1114) 2022-05-26 15:19:00 +03:00
AmitUp9
569a687fdf Line numbers counter func added to utils (#1112)
* #run_acceptance_tests

* added new line to the end of the file

* #run_acceptance_tests

* linter fix

* more linter issues fix
2022-05-25 13:44:50 +03:00
AmitUp9
11e8b5eb65 TRA_4437- disable line numbers checkbox when only one line in content (#1109)
* number of lines state added

* added useEffect to update to showLineNubers state dynamically

* small cr fixes
2022-05-24 15:07:40 +03:00
AmitUp9
3901f3f3fe change icon style css to move the icon so it can be clicked (#1111) 2022-05-24 14:16:27 +03:00
Nimrod Gilboa Markevich
2f1cc21fcb Change redact to opt in (#1104) 2022-05-24 14:12:37 +03:00
leon-up9
433253a27b font & padding change (#1108)
Co-authored-by: Leon <>
2022-05-24 12:09:13 +03:00
RoyUP9
00cc94fbe5 Changed OAS generator to get entries by push (#1103) 2022-05-22 14:53:48 +03:00
Nimrod Gilboa Markevich
8feef78ab1 Ignore mizu traffic in performance tests (#1102) 2022-05-22 12:08:22 +03:00
Nimrod Gilboa Markevich
992abc99bc Disable redaction in performance tests (#1101) 2022-05-22 12:05:24 +03:00
Nimrod Gilboa Markevich
486d0b1088 Support tapper profiling on macOS (#1098) 2022-05-19 15:11:46 +03:00
David Levanon
f61a02d288 fix heap and goroutines metrics (#1099) 2022-05-19 14:55:13 +03:00
M. Mert Yıldıran
03694e57c0 Fix checkFilterByMethod in acceptance tests (#1094)
* Fix `checkFilterByMethod` in acceptance tests

* #run_acceptance_tests

* Add `numberOfRecords` parameter to test specs #run_acceptance_tests

* Fix the values #run_acceptance_tests

* Fix the values #run_acceptance_tests

* Fix #run_acceptance_tests

* #run_acceptance_tests

* Reduce value duplication #run_acceptance_tests

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: Roee Gadot <roee.gadot@up9.com>
2022-05-19 13:37:50 +03:00
gadotroee
1760afda2b Add tolerations to api server pod (#1035) 2022-05-19 10:46:36 +03:00
M. Mert Yıldıran
522e2cc3da Call SetProtocol in AMQP faster and remove GetProtocol method (#1097)
* Call `SetProtocol` in AMQP faster and remove `GetProtocol` method

* #run_acceptance_tests

* Remove the unused fields from the test mocks #run_acceptance_tests
2022-05-19 10:07:40 +03:00
Nimrod Gilboa Markevich
ab38f4c011 Add profiling tools (#1087)
* Add gin-contrib/pprof dependency

* Run pprof server on agent with --profiler flag

* Add --profiler flag to cli

* Fix error message

* Print cpu usage percentage

* measure cpu of current pid instead of globaly on the system

* Add scripts to plot performance

* Plot packetsCount in analysis

* Concat to DataFrame

* Plot in turbo colorscheme

* Make COLORMAP const

* Fix rss units

* Reduce code repetition by adding function for plotting

* Allow grouping based on filenames

* Temporary: Marked with comments where to disable code for experiments

* Add newline at end of file

* Add tap.cpuprofile flag. Change memprofile flag to tap.memprofile

* create tapper modes for debugging using env vars

* Fix rss plot units (MB instead of bytes)

* Remove comment

* Add info to plot script

* Remove tap.cpumemprofile. Rename tap.memprofile to memprofile

* Remove unused import

* Remove whitespaces

Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>

* Remove whitespaces

Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>

* Remove whitespaces

Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>

* Remove whitespaces

Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>

* Remove whitespaces

Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>

* Remove whitespaces

Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>

* Rename debug env vars

* Create package for debug env vars, read each env var once

* Run go mod tidy

* Increment MatchedPairs before emitting

* Only count cores once

* Count virtual and physical cores

* Add dbgctl replace in cli

* Fix lint: Check return values

* Add tap/dbgctl to test-lint make rule

* Replace tap/dbgctl in all modules

* #run_acceptance_tests

* Copy dbgctl module to docker image

* Debug/profile tapper benchmark (#1093)

* add mizu debug env to avoid all extensions

* add readme + run_tapper_benchmark.sh

* temporary change branch name

* fix readme

* fix MIZU_BENCHMARK_CLIENTS_COUNT env

* change tap target to tcp stream

* track live tcp streams

* pr fixes

* rename tapperPacketsCount to ignored_packets_count

* change mizu tapper to mizu debugg

Co-authored-by: David Levanon <dvdlevanon@gmail.com>
Co-authored-by: M. Mert Yıldıran <mehmet@up9.com>
2022-05-18 15:42:13 +03:00
David Levanon
a9de4f0bba stop tapping self tapper traffic (#1083)
* stop tapping self tapper traffic

* run go mod tidy

* allow to explicitly ignore ports

* remove unused code

* remove shared from tap + go mod tidy

* move ignroe ports to tapper

* rename TapperPacketsCount to IgnoredPacketsCount

* don't check null - go is smart

* remove nil check
2022-05-18 15:13:10 +03:00
lirazyehezkel
948af518b5 Fix double info icon in tapping status bar (#1095) 2022-05-18 14:40:20 +03:00
M. Mert Yıldıran
73448b514e Fix checkFilter method in the acceptance tests (#1092)
* Fix `checkFilter` method in the acceptance tests

* #run_acceptance_tests

* Remove duplicate assertion #run_acceptance_tests
2022-05-17 14:41:08 +03:00
M. Mert Yıldıran
fc194354bc Fix a nil pointer dereference error that occurs in tcpReader's Read method (#1090)
* Fix a `nil pointer dereference` error that occurs in `tcpReader`'s `Read` method

* #run_acceptance_tests

* #run_acceptance_tests

* Revert "Fix a `nil pointer dereference` error that occurs in `tcpReader`'s `Read` method"

This reverts commit ccef6cb393.

* Fix the race condition using locks #run_acceptance_tests
2022-05-17 13:13:50 +03:00
M. Mert Yıldıran
8418802c7e Handle the wait for fetch in acceptance tests better (#1088)
* Handle the wait for fetch in acceptance tests better

* #run_acceptance_tests

* Fix the error #run_acceptance_tests

* Fix `waitForFetchAndPause` and `checkFilter` #run_acceptance_tests

* Fix the tests #run_acceptance_tests
2022-05-17 11:01:47 +03:00
M. Mert Yıldıran
bfa834e840 Spawn only two Goroutines per TCP stream (#1062)
* Spawn only two Goroutines per TCP stream

* Fix the linter error

* Use `isProtocolIdentified` method instead

* Fix the `Read` method of `tcpReader`

* Remove unnecessary `append`

* Copy to buffer only a message is received

* Remove `exhaustBuffer` field and add `rewind` function

* Rename `buffer` field to `pastData`

* Update tap/tcp_reader.go

Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>

* Use `copy` instead of assignment

* No lint

* #run_acceptance_tests

* Fix `rewind` #run_acceptance_tests

* Fix the buffering algorithm #run_acceptance_tests

* Add `TODO`

* Fix the problems in AMQP and Kafka #run_acceptance_tests

* Use `*bytes.Buffer` instead of `[]api.TcpReaderDataMsg` #run_acceptance_tests

* Have a single `*bytes.Buffer`

* Revert "Have a single `*bytes.Buffer`"

This reverts commit fad96a288a.

* Revert "Use `*bytes.Buffer` instead of `[]api.TcpReaderDataMsg` #run_acceptance_tests"

This reverts commit 0fc70bffe2.

* Fix the early timing out issue #run_acceptance_tests

* Remove `NewBytes()` method

* Update the `NewTcpReader` method signature #run_acceptance_tests

* #run_acceptance_tests

* #run_acceptance_tests

* #run_acceptance_tests

Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>
2022-05-16 16:06:36 +03:00
75 changed files with 1072 additions and 634 deletions

View File

@@ -290,3 +290,15 @@ jobs:
tag: ${{ steps.versioning.outputs.version }}
prerelease: ${{ github.ref != 'refs/heads/main' }}
bodyFile: 'cli/bin/README.md'
- name: Slack notification on failure
uses: ravsamhq/notify-slack-action@v1
if: always()
with:
status: ${{ job.status }}
notification_title: 'Mizu enterprise {workflow} has {status_message}'
message_format: '{emoji} *{workflow}* {status_message} during <{run_url}|run>, after commit <{commit_url}|{commit_sha}> by ${{ github.event.head_commit.author.name }} <${{ github.event.head_commit.author.email }}> ```${{ github.event.head_commit.message }}```'
footer: 'Linked Repo <{repo_url}|{repo}>'
notify_when: 'failure'
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@@ -68,6 +68,7 @@ COPY shared/go.mod shared/go.mod ../shared/
COPY logger/go.mod logger/go.mod ../logger/
COPY tap/go.mod tap/go.mod ../tap/
COPY tap/api/go.mod ../tap/api/
COPY tap/dbgctl/go.mod ../tap/dbgctl/
COPY tap/extensions/amqp/go.mod ../tap/extensions/amqp/
COPY tap/extensions/http/go.mod ../tap/extensions/http/
COPY tap/extensions/kafka/go.mod ../tap/extensions/kafka/

View File

@@ -83,6 +83,7 @@ test-lint: ## Run lint on all modules
cd cli && golangci-lint run
cd acceptanceTests && golangci-lint run
cd tap/api && golangci-lint run
cd tap/dbgctl && golangci-lint run
cd tap/extensions/ && for D in */; do cd $$D && golangci-lint run && cd ..; done
test-cli: ## Run cli tests

View File

@@ -58,7 +58,7 @@ export function rightOnHoverCheck(path, expectedText) {
}
export function checkFilterByMethod(funcDict) {
const {protocol, method, methodQuery, summary, summaryQuery} = funcDict;
const {protocol, method, methodQuery, summary, summaryQuery, numberOfRecords} = funcDict;
const summaryDict = getSummaryDict(summary, summaryQuery);
const methodDict = getMethodDict(method, methodQuery);
const protocolDict = getProtocolDict(protocol.name, protocol.text);
@@ -69,47 +69,53 @@ export function checkFilterByMethod(funcDict) {
cy.get('[type="submit"]').click();
cy.get('.w-tc-editor').should('have.attr', 'style').and('include', Cypress.env('greenFilterColor'));
cy.get('#entries-length').should('not.have.text', '0').then(() => {
cy.get(`#list [id]`).then(elements => {
const listElmWithIdAttr = Object.values(elements);
let doneCheckOnFirst = false;
waitForFetch(numberOfRecords);
pauseStream();
cy.get('#entries-length').invoke('text').then(len => {
resizeIfNeeded(len);
listElmWithIdAttr.forEach(entry => {
if (entry?.id && entry.id.match(RegExp(/entry-(\d{24})$/gm))) {
const entryId = getEntryId(entry.id);
cy.get(`#list [id^=entry]`).then(elements => {
const listElmWithIdAttr = Object.values(elements);
let doneCheckOnFirst = false;
leftTextCheck(entryId, methodDict.pathLeft, methodDict.expectedText);
leftTextCheck(entryId, protocolDict.pathLeft, protocolDict.expectedTextLeft);
if (summaryDict)
leftTextCheck(entryId, summaryDict.pathLeft, summaryDict.expectedText);
cy.get('#entries-length').invoke('text').then(len => {
listElmWithIdAttr.forEach(entry => {
if (entry?.id && entry.id.match(RegExp(/entry-(\d{24})$/gm))) {
const entryId = getEntryId(entry.id);
if (!doneCheckOnFirst) {
deepCheck(funcDict, protocolDict, methodDict, entry);
doneCheckOnFirst = true;
}
}
});
resizeIfNeeded(len);
});
leftTextCheck(entryId, methodDict.pathLeft, methodDict.expectedText);
leftTextCheck(entryId, protocolDict.pathLeft, protocolDict.expectedTextLeft);
if (summaryDict)
leftTextCheck(entryId, summaryDict.pathLeft, summaryDict.expectedText);
if (!doneCheckOnFirst) {
deepCheck(funcDict, protocolDict, methodDict, entry);
doneCheckOnFirst = true;
}
}
});
});
});
});
}
export const refreshWaitTimeout = 10000;
export function waitForFetch(gt) {
cy.get('#entries-length', {timeout: refreshWaitTimeout}).should((el) => {
expect(parseInt(el.text().trim(), 10)).to.be.greaterThan(gt);
});
}
export function pauseStream() {
cy.get('#pause-icon').click();
cy.get('#pause-icon').should('not.be.visible');
}
export function getEntryId(id) {
// take the second part from the string (entry-<ID>)
return id.split('-')[1];
}
function resizeIfNeeded(entriesLen) {
if (entriesLen > maxEntriesInDom){
Cypress.config().viewportHeight === Cypress.env('normalMizuHeight') ?
resizeToHugeMizu() : resizeToNormalMizu()
}
}
function deepCheck(generalDict, protocolDict, methodDict, entry) {
const entryId = getEntryId(entry.id);
const {summary, value} = generalDict;

View File

@@ -5,6 +5,7 @@ it('opening mizu', function () {
});
const rabbitProtocolDetails = {name: 'AMQP', text: 'Advanced Message Queuing Protocol 0-9-1'};
const numberOfRecords = 5;
checkFilterByMethod({
protocol: rabbitProtocolDetails,
@@ -12,6 +13,7 @@ checkFilterByMethod({
methodQuery: 'request.method == "exchange declare"',
summary: 'exchange',
summaryQuery: 'request.exchange == "exchange"',
numberOfRecords: numberOfRecords,
value: null
});
@@ -21,6 +23,7 @@ checkFilterByMethod({
methodQuery: 'request.method == "queue declare"',
summary: 'queue',
summaryQuery: 'request.queue == "queue"',
numberOfRecords: numberOfRecords,
value: null
});
@@ -30,6 +33,7 @@ checkFilterByMethod({
methodQuery: 'request.method == "queue bind"',
summary: 'queue',
summaryQuery: 'request.queue == "queue"',
numberOfRecords: numberOfRecords,
value: null
});
@@ -39,6 +43,7 @@ checkFilterByMethod({
methodQuery: 'request.method == "basic publish"',
summary: 'exchange',
summaryQuery: 'request.exchange == "exchange"',
numberOfRecords: numberOfRecords,
value: {tab: valueTabs.request, regex: /^message$/mg}
});
@@ -48,6 +53,7 @@ checkFilterByMethod({
methodQuery: 'request.method == "basic consume"',
summary: 'queue',
summaryQuery: 'request.queue == "queue"',
numberOfRecords: numberOfRecords,
value: null
});
@@ -57,5 +63,6 @@ checkFilterByMethod({
methodQuery: 'request.method == "basic deliver"',
summary: 'exchange',
summaryQuery: 'request.queue == "exchange"',
numberOfRecords: numberOfRecords,
value: {tab: valueTabs.request, regex: /^message$/mg}
});

View File

@@ -5,6 +5,7 @@ it('opening mizu', function () {
});
const redisProtocolDetails = {name: 'redis', text: 'Redis Serialization Protocol'};
const numberOfRecords = 5;
checkFilterByMethod({
protocol: redisProtocolDetails,
@@ -12,6 +13,7 @@ checkFilterByMethod({
methodQuery: 'request.command == "PING"',
summary: null,
summaryQuery: '',
numberOfRecords: numberOfRecords,
value: null
})
@@ -21,6 +23,7 @@ checkFilterByMethod({
methodQuery: 'request.command == "SET"',
summary: 'key',
summaryQuery: 'request.key == "key"',
numberOfRecords: numberOfRecords,
value: {tab: valueTabs.request, regex: /^\[value, keepttl]$/mg}
})
@@ -30,6 +33,7 @@ checkFilterByMethod({
methodQuery: 'request.command == "EXISTS"',
summary: 'key',
summaryQuery: 'request.key == "key"',
numberOfRecords: numberOfRecords,
value: {tab: valueTabs.response, regex: /^1$/mg}
})
@@ -39,6 +43,7 @@ checkFilterByMethod({
methodQuery: 'request.command == "GET"',
summary: 'key',
summaryQuery: 'request.key == "key"',
numberOfRecords: numberOfRecords,
value: {tab: valueTabs.response, regex: /^value$/mg}
})
@@ -48,5 +53,6 @@ checkFilterByMethod({
methodQuery: 'request.command == "DEL"',
summary: 'key',
summaryQuery: 'request.key == "key"',
numberOfRecords: numberOfRecords,
value: {tab: valueTabs.response, regex: /^1$|^0$/mg}
})

View File

@@ -7,12 +7,12 @@ import {
resizeToNormalMizu,
rightOnHoverCheck,
rightTextCheck,
verifyMinimumEntries
verifyMinimumEntries,
refreshWaitTimeout,
waitForFetch,
pauseStream
} from "../testHelpers/TrafficHelper";
const refreshWaitTimeout = 10000;
const fullParam = Cypress.env('arrayDict'); // "Name:fooNamespace:barName:foo1Namespace:bar1"
const podsArray = fullParam.split('Name:').slice(1); // ["fooNamespace:bar", "foo1Namespace:bar1"]
podsArray.forEach((podStr, index) => {
@@ -70,7 +70,8 @@ checkFilter({
leftSideExpectedText: 'HTTP',
rightSidePath: '[title=HTTP]',
rightSideExpectedText: 'Hypertext Transfer Protocol -- HTTP/1.1',
applyByCtrlEnter: true
applyByCtrlEnter: true,
numberOfRecords: 20,
});
checkFilter({
@@ -79,7 +80,8 @@ checkFilter({
leftSideExpectedText: '200',
rightSidePath: '> :nth-child(2) [title="Status Code"]',
rightSideExpectedText: '200',
applyByCtrlEnter: false
applyByCtrlEnter: false,
numberOfRecords: 20
});
if (Cypress.env('shouldCheckSrcAndDest')) {
@@ -91,7 +93,8 @@ if (Cypress.env('shouldCheckSrcAndDest')) {
leftSideExpectedText: '[Unresolved]',
rightSidePath: '> :nth-child(2) [title="Source Name"]',
rightSideExpectedText: '[Unresolved]',
applyByCtrlEnter: false
applyByCtrlEnter: false,
numberOfRecords: 20
});
checkFilter({
@@ -100,7 +103,8 @@ if (Cypress.env('shouldCheckSrcAndDest')) {
leftSideExpectedText: 'httpbin.mizu-tests',
rightSidePath: '> :nth-child(2) > :nth-child(2) > :nth-child(2) > :nth-child(3) > :nth-child(2)',
rightSideExpectedText: 'httpbin.mizu-tests',
applyByCtrlEnter: false
applyByCtrlEnter: false,
numberOfRecords: 20
});
}
@@ -110,7 +114,8 @@ checkFilter({
leftSideExpectedText: 'GET',
rightSidePath: '> :nth-child(2) > :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
rightSideExpectedText: 'GET',
applyByCtrlEnter: true
applyByCtrlEnter: true,
numberOfRecords: 20
});
checkFilter({
@@ -119,7 +124,8 @@ checkFilter({
leftSideExpectedText: '/get',
rightSidePath: '> :nth-child(2) > :nth-child(2) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
rightSideExpectedText: '/get',
applyByCtrlEnter: false
applyByCtrlEnter: false,
numberOfRecords: 20
});
checkFilter({
@@ -128,7 +134,8 @@ checkFilter({
leftSideExpectedText: '127.0.0.1',
rightSidePath: '> :nth-child(2) [title="Source IP"]',
rightSideExpectedText: '127.0.0.1',
applyByCtrlEnter: false
applyByCtrlEnter: false,
numberOfRecords: 20
});
checkFilterNoResults('request.method == "POST"');
@@ -187,68 +194,56 @@ function checkFilter(filterDetails) {
rightSidePath,
rightSideExpectedText,
leftSideExpectedText,
applyByCtrlEnter
applyByCtrlEnter,
numberOfRecords
} = filterDetails;
const entriesForDeeperCheck = 5;
it(`checking the filter: ${filter}`, function () {
waitForFetch50AndPause();
cy.get('.w-tc-editor-text').clear();
// applying the filter with alt+enter or with the button
cy.get('.w-tc-editor-text').type(`${filter}${applyByCtrlEnter ? '{ctrl+enter}' : ''}`);
cy.get('.w-tc-editor').should('have.attr', 'style').and('include', Cypress.env('greenFilterColor'));
if (!applyByCtrlEnter)
cy.get('[type="submit"]').click();
cy.get('#total-entries').should('not.have.text', '0').then(number => {
const totalEntries = number.text();
waitForFetch(numberOfRecords);
pauseStream();
cy.get(`#list [id^=entry]`).last().then(elem => {
const element = elem[0];
const entryId = getEntryId(element.id);
// checks the hover on the last entry (the only one in DOM at the beginning)
leftOnHoverCheck(entryId, leftSidePath, filter);
cy.get(`#list [id^=entry]`).last().then(elem => {
const element = elem[0];
const entryId = getEntryId(element.id);
cy.get('.w-tc-editor-text').clear();
// applying the filter with alt+enter or with the button
cy.get('.w-tc-editor-text').type(`${filter}${applyByCtrlEnter ? '{ctrl+enter}' : ''}`);
cy.get('.w-tc-editor').should('have.attr', 'style').and('include', Cypress.env('greenFilterColor'));
if (!applyByCtrlEnter)
cy.get('[type="submit"]').click();
// only one entry in DOM after filtering, checking all checks on it
leftTextCheck(entryId, leftSidePath, leftSideExpectedText);
leftOnHoverCheck(entryId, leftSidePath, filter);
waitForFetch50AndPause();
rightTextCheck(rightSidePath, rightSideExpectedText);
rightOnHoverCheck(rightSidePath, filter);
checkRightSideResponseBody();
});
// only one entry in DOM after filtering, checking all checks on it
leftTextCheck(entryId, leftSidePath, leftSideExpectedText);
leftOnHoverCheck(entryId, leftSidePath, filter);
resizeToHugeMizu();
rightTextCheck(rightSidePath, rightSideExpectedText);
rightOnHoverCheck(rightSidePath, filter);
checkRightSideResponseBody();
});
// checking only 'leftTextCheck' on all entries because the rest of the checks require more time
cy.get(`#list [id^=entry]`).each(elem => {
const element = elem[0];
let entryId = getEntryId(element.id);
leftTextCheck(entryId, leftSidePath, leftSideExpectedText);
});
resizeToHugeMizu();
// making the other 3 checks on the first X entries (longer time for each check)
deeperCheck(leftSidePath, rightSidePath, filter, rightSideExpectedText, entriesForDeeperCheck);
// checking only 'leftTextCheck' on all entries because the rest of the checks require more time
cy.get(`#list [id^=entry]`).each(elem => {
const element = elem[0];
let entryId = getEntryId(element.id);
leftTextCheck(entryId, leftSidePath, leftSideExpectedText);
});
// making the other 3 checks on the first X entries (longer time for each check)
deeperCheck(leftSidePath, rightSidePath, filter, rightSideExpectedText, entriesForDeeperCheck);
// reloading then waiting for the entries number to load
resizeToNormalMizu();
cy.reload();
cy.get('#total-entries', {timeout: refreshWaitTimeout}).should('have.text', totalEntries);
})
// reloading then waiting for the entries number to load
resizeToNormalMizu();
cy.reload();
waitForFetch(numberOfRecords);
pauseStream();
});
}
function waitForFetch50AndPause() {
// wait half a second and pause the stream to preserve the DOM
cy.wait(500);
cy.get('#pause-icon').click();
cy.get('#pause-icon').should('not.be.visible');
}
function deeperCheck(leftSidePath, rightSidePath, filterName, rightSideExpectedText, entriesNumToCheck) {
cy.get(`#list [id^=entry]`).each((element, index) => {
if (index < entriesNumToCheck) {
@@ -273,11 +268,12 @@ function checkRightSideResponseBody() {
const decodedBody = atob(encodedBody);
const responseBody = JSON.parse(decodedBody);
const expectdJsonBody = {
args: RegExp({}),
url: RegExp('http://.*/get'),
headers: {
"User-Agent": RegExp('[REDACTED]'),
"User-Agent": RegExp('client'),
"Accept-Encoding": RegExp('gzip'),
"X-Forwarded-Uri": RegExp('/api/v1/namespaces/.*/services/.*/proxy/get')
}
@@ -294,16 +290,16 @@ function checkRightSideResponseBody() {
cy.get(`${Cypress.env('bodyJsonClass')} > `).its('length').should('be.gt', 1).then(linesNum => {
cy.get(`${Cypress.env('bodyJsonClass')} > >`).its('length').should('be.gt', linesNum).then(jsonItemsNum => {
checkPrettyAndLineNums(jsonItemsNum, decodedBody);
// checkPrettyAndLineNums(decodedBody);
clickCheckbox('Line numbers');
checkPrettyOrNothing(jsonItemsNum, decodedBody);
//clickCheckbox('Line numbers');
//checkPrettyOrNothing(jsonItemsNum, decodedBody);
clickCheckbox('Pretty');
checkPrettyOrNothing(jsonItemsNum, decodedBody);
clickCheckbox('Line numbers');
checkOnlyLineNumberes(jsonItemsNum, decodedBody);
// clickCheckbox('Pretty');
// checkPrettyOrNothing(jsonItemsNum, decodedBody);
//
// clickCheckbox('Line numbers');
// checkOnlyLineNumberes(jsonItemsNum, decodedBody);
});
});
});
@@ -313,7 +309,7 @@ function clickCheckbox(type) {
cy.contains(`${type}`).prev().children().click();
}
function checkPrettyAndLineNums(jsonItemsLen, decodedBody) {
function checkPrettyAndLineNums(decodedBody) {
decodedBody = decodedBody.replaceAll(' ', '');
cy.get(`${Cypress.env('bodyJsonClass')} >`).then(elements => {
const lines = Object.values(elements);

View File

@@ -54,3 +54,5 @@ replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api
replace github.com/up9inc/mizu/tap/dbgctl v0.0.0 => ../tap/dbgctl

View File

@@ -343,6 +343,7 @@ func TestTapRedact(t *testing.T) {
tapNamespace := GetDefaultTapNamespace()
tapCmdArgs = append(tapCmdArgs, tapNamespace...)
tapCmdArgs = append(tapCmdArgs, "--redact")
tapCmd := exec.Command(cliPath, tapCmdArgs...)
t.Logf("running command: %v", tapCmd.String())
@@ -394,8 +395,6 @@ func TestTapNoRedact(t *testing.T) {
tapNamespace := GetDefaultTapNamespace()
tapCmdArgs = append(tapCmdArgs, tapNamespace...)
tapCmdArgs = append(tapCmdArgs, "--no-redact")
tapCmd := exec.Command(cliPath, tapCmdArgs...)
t.Logf("running command: %v", tapCmd.String())
@@ -446,6 +445,8 @@ func TestTapRegexMasking(t *testing.T) {
tapNamespace := GetDefaultTapNamespace()
tapCmdArgs = append(tapCmdArgs, tapNamespace...)
tapCmdArgs = append(tapCmdArgs, "--redact")
tapCmdArgs = append(tapCmdArgs, "-r", "Mizu")
tapCmd := exec.Command(cliPath, tapCmdArgs...)

View File

@@ -7,6 +7,7 @@ require (
github.com/chanced/openapi v0.0.8
github.com/djherbis/atime v1.1.0
github.com/getkin/kin-openapi v0.89.0
github.com/gin-contrib/pprof v1.3.0
github.com/gin-contrib/static v0.0.1
github.com/gin-gonic/gin v1.7.7
github.com/go-playground/locales v0.14.0
@@ -24,6 +25,7 @@ require (
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0
github.com/up9inc/mizu/tap/dbgctl v0.0.0
github.com/up9inc/mizu/tap/extensions/amqp v0.0.0
github.com/up9inc/mizu/tap/extensions/http v0.0.0
github.com/up9inc/mizu/tap/extensions/kafka v0.0.0
@@ -60,6 +62,7 @@ require (
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-errors/errors v1.4.2 // indirect
github.com/go-logr/logr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.6 // indirect
github.com/go-openapi/swag v0.21.1 // indirect
@@ -101,15 +104,20 @@ require (
github.com/russross/blackfriday v1.6.0 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect
github.com/segmentio/kafka-go v0.4.27 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/spf13/cobra v1.3.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/struCoder/pidusage v0.2.1 // indirect
github.com/tidwall/gjson v1.14.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.2.4 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/ugorji/go/codec v1.2.6 // indirect
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 // indirect
github.com/xlab/treeprint v1.1.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.starlark.net v0.0.0-20220203230714-bb14e151c28f // indirect
golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
@@ -151,3 +159,5 @@ replace github.com/up9inc/mizu/tap/extensions/http v0.0.0 => ../tap/extensions/h
replace github.com/up9inc/mizu/tap/extensions/kafka v0.0.0 => ../tap/extensions/kafka
replace github.com/up9inc/mizu/tap/extensions/redis v0.0.0 => ../tap/extensions/redis
replace github.com/up9inc/mizu/tap/dbgctl v0.0.0 => ../tap/dbgctl

View File

@@ -213,10 +213,13 @@ github.com/getkin/kin-openapi v0.89.0 h1:p4nagHchUKGn85z/f+pse4aSh50nIBOYjOhMIku
github.com/getkin/kin-openapi v0.89.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gin-contrib/pprof v1.3.0 h1:G9eK6HnbkSqDZBYbzG4wrjCsA4e+cvYAHUZw6W+W9K0=
github.com/gin-contrib/pprof v1.3.0/go.mod h1:waMjT1H9b179t3CxuG1cV3DHpga6ybizwfBaM5OXaB0=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-contrib/static v0.0.1 h1:JVxuvHPuUfkoul12N7dtQw7KRn/pSMq7Ue1Va9Swm1U=
github.com/gin-contrib/static v0.0.1/go.mod h1:CSxeF+wep05e0kCOsqWdAWbSszmc31zTIbD8TvWl7Hs=
github.com/gin-gonic/gin v1.6.2/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs=
github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U=
@@ -238,6 +241,8 @@ github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV
github.com/go-logr/logr v1.2.2 h1:ahHml/yUpnlb96Rp8HCvtYVPY8ZYpxq3g7UYchIYwbs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/zapr v1.2.0/go.mod h1:Qa4Bsj2Vb+FAVeAKsLD8RLQ+YRJB8YDmOAKxaBQf7Ro=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
@@ -618,6 +623,8 @@ github.com/segmentio/kafka-go v0.4.27 h1:sIhEozeL/TLN2mZ5dkG462vcGEWYKS+u31sXPjK
github.com/segmentio/kafka-go v0.4.27/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@@ -659,6 +666,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/struCoder/pidusage v0.2.1 h1:dFiEgUDkubeIj0XA1NpQ6+8LQmKrLi7NiIQl86E6BoY=
github.com/struCoder/pidusage v0.2.1/go.mod h1:bewtP2KUA1TBUyza5+/PCpSQ6sc/H6jJbIKAzqW86BA=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tidwall/gjson v1.10.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.12.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
@@ -672,6 +681,10 @@ github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhso
github.com/tidwall/sjson v1.2.3/go.mod h1:5WdjKx3AQMvCJ4RG6/2UYT7dLrGvJUV1x4jdTAyGvZs=
github.com/tidwall/sjson v1.2.4 h1:cuiLzLnaMeBhRmEv00Lpk3tkYrcxpmbU81tAY4Dw0tc=
github.com/tidwall/sjson v1.2.4/go.mod h1:098SZ494YoMWPmMO6ct4dcFnqxwj9r/gF0Etp19pSNM=
github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o=
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
@@ -702,6 +715,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
go.etcd.io/etcd/api/v3 v3.5.1/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
@@ -894,6 +909,7 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -962,6 +978,7 @@ golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220207234003-57398862261d h1:Bm7BNOQt2Qv7ZqysjeLjgCBanX+88Z/OtdvsrEv1Djc=
golang.org/x/sys v0.0.0-20220207234003-57398862261d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

View File

@@ -14,6 +14,7 @@ import (
"syscall"
"time"
"github.com/gin-contrib/pprof"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/dependency"
@@ -36,6 +37,7 @@ import (
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/dbgctl"
)
var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API")
@@ -45,6 +47,7 @@ var apiServerAddress = flag.String("api-server-address", "", "Address of mizu AP
var namespace = flag.String("namespace", "", "Resolve IPs if they belong to resources in this namespace (default is all)")
var harsReaderMode = flag.Bool("hars-read", false, "Run in hars-read mode")
var harsDir = flag.String("hars-dir", "", "Directory to read hars from")
var profiler = flag.Bool("profiler", false, "Run pprof server")
const (
socketConnectionRetries = 30
@@ -61,7 +64,7 @@ func main() {
app.LoadExtensions()
if !*tapperMode && !*apiServerMode && !*standaloneMode && !*harsReaderMode {
panic("One of the flags --tap, --api or --standalone or --hars-read must be provided")
panic("One of the flags --tap, --api-server, --standalone or --hars-read must be provided")
}
if *standaloneMode {
@@ -69,7 +72,14 @@ func main() {
} else if *tapperMode {
runInTapperMode()
} else if *apiServerMode {
utils.StartServer(runInApiServerMode(*namespace))
app := runInApiServerMode(*namespace)
if *profiler {
pprof.Register(app)
}
utils.StartServer(app)
} else if *harsReaderMode {
runInHarReaderMode()
}
@@ -152,7 +162,9 @@ func runInTapperMode() {
}
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tapOpts := &tap.TapOpts{
HostMode: hostMode,
}
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
@@ -198,7 +210,7 @@ func runInHarReaderMode() {
func enableExpFeatureIfNeeded() {
if config.Config.OAS {
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator)
oasGenerator.Start(nil)
oasGenerator.Start()
}
if config.Config.ServiceMap {
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap)
@@ -281,6 +293,10 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
continue
}
if dbgctl.MizuTapperDisableSending {
continue
}
// NOTE: This is where the `*tapApi.OutputChannelItem` leaves the code
// and goes into the intermediate WebSocket.
err = connection.WriteMessage(websocket.TextMessage, marshaledData)

View File

@@ -18,6 +18,7 @@ import (
"github.com/up9inc/mizu/agent/pkg/holder"
"github.com/up9inc/mizu/agent/pkg/providers"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/up9inc/mizu/agent/pkg/resolver"
@@ -152,6 +153,9 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGeneratorSink)
oasGenerator.HandleEntry(mizuEntry)
}
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/up9inc/mizu/agent/pkg/api"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/dbgctl"
tapApi "github.com/up9inc/mizu/tap/api"
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
httpExt "github.com/up9inc/mizu/tap/extensions/http"
@@ -24,36 +25,38 @@ var (
)
func LoadExtensions() {
Extensions = make([]*tapApi.Extension, 4)
Extensions = make([]*tapApi.Extension, 0)
ExtensionsMap = make(map[string]*tapApi.Extension)
extensionAmqp := &tapApi.Extension{}
dissectorAmqp := amqpExt.NewDissector()
dissectorAmqp.Register(extensionAmqp)
extensionAmqp.Dissector = dissectorAmqp
Extensions[0] = extensionAmqp
ExtensionsMap[extensionAmqp.Protocol.Name] = extensionAmqp
extensionHttp := &tapApi.Extension{}
dissectorHttp := httpExt.NewDissector()
dissectorHttp.Register(extensionHttp)
extensionHttp.Dissector = dissectorHttp
Extensions[1] = extensionHttp
Extensions = append(Extensions, extensionHttp)
ExtensionsMap[extensionHttp.Protocol.Name] = extensionHttp
extensionKafka := &tapApi.Extension{}
dissectorKafka := kafkaExt.NewDissector()
dissectorKafka.Register(extensionKafka)
extensionKafka.Dissector = dissectorKafka
Extensions[2] = extensionKafka
ExtensionsMap[extensionKafka.Protocol.Name] = extensionKafka
if !dbgctl.MizuTapperDisableNonHttpExtensions {
extensionAmqp := &tapApi.Extension{}
dissectorAmqp := amqpExt.NewDissector()
dissectorAmqp.Register(extensionAmqp)
extensionAmqp.Dissector = dissectorAmqp
Extensions = append(Extensions, extensionAmqp)
ExtensionsMap[extensionAmqp.Protocol.Name] = extensionAmqp
extensionRedis := &tapApi.Extension{}
dissectorRedis := redisExt.NewDissector()
dissectorRedis.Register(extensionRedis)
extensionRedis.Dissector = dissectorRedis
Extensions[3] = extensionRedis
ExtensionsMap[extensionRedis.Protocol.Name] = extensionRedis
extensionKafka := &tapApi.Extension{}
dissectorKafka := kafkaExt.NewDissector()
dissectorKafka.Register(extensionKafka)
extensionKafka.Dissector = dissectorKafka
Extensions = append(Extensions, extensionKafka)
ExtensionsMap[extensionKafka.Protocol.Name] = extensionKafka
extensionRedis := &tapApi.Extension{}
dissectorRedis := redisExt.NewDissector()
dissectorRedis.Register(extensionRedis)
extensionRedis.Dissector = dissectorRedis
Extensions = append(Extensions, extensionRedis)
ExtensionsMap[extensionRedis.Protocol.Name] = extensionRedis
}
sort.Slice(Extensions, func(i, j int) bool {
return Extensions[i].Protocol.Priority < Extensions[j].Protocol.Priority

View File

@@ -1,17 +1,12 @@
package controllers
import (
"bytes"
basenine "github.com/up9inc/basenine/client/go"
"net"
"net/http/httptest"
"testing"
"time"
"github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/oas"
)
func TestGetOASServers(t *testing.T) {
@@ -37,33 +32,14 @@ func TestGetOASSpec(t *testing.T) {
t.Logf("Written body: %s", recorder.Body.String())
}
type fakeConn struct {
sendBuffer *bytes.Buffer
receiveBuffer *bytes.Buffer
}
func (f fakeConn) Read(p []byte) (int, error) { return f.sendBuffer.Read(p) }
func (f fakeConn) Write(p []byte) (int, error) { return f.receiveBuffer.Write(p) }
func (fakeConn) Close() error { return nil }
func (fakeConn) LocalAddr() net.Addr { return nil }
func (fakeConn) RemoteAddr() net.Addr { return nil }
func (fakeConn) SetDeadline(t time.Time) error { return nil }
func (fakeConn) SetReadDeadline(t time.Time) error { return nil }
func (fakeConn) SetWriteDeadline(t time.Time) error { return nil }
func getRecorderAndContext() (*httptest.ResponseRecorder, *gin.Context) {
dummyConn := new(basenine.Connection)
dummyConn.Conn = fakeConn{
sendBuffer: bytes.NewBufferString("\n"),
receiveBuffer: bytes.NewBufferString("\n"),
}
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} {
return oas.GetDefaultOasGeneratorInstance()
})
recorder := httptest.NewRecorder()
c, _ := gin.CreateTestContext(recorder)
oas.GetDefaultOasGeneratorInstance().Start(dummyConn)
oas.GetDefaultOasGeneratorInstance().Start()
oas.GetDefaultOasGeneratorInstance().GetServiceSpecs().Store("some", oas.NewGen("some"))
return recorder, c
}

View File

@@ -1,14 +1,11 @@
package oas
import (
"context"
"encoding/json"
"net/url"
"sync"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/logger"
@@ -19,22 +16,20 @@ var (
instance *defaultOasGenerator
)
type OasGeneratorSink interface {
HandleEntry(mizuEntry *api.Entry)
}
type OasGenerator interface {
Start(conn *basenine.Connection)
Start()
Stop()
IsStarted() bool
GetServiceSpecs() *sync.Map
SetEntriesQuery(query string) bool
}
type defaultOasGenerator struct {
started bool
ctx context.Context
cancel context.CancelFunc
serviceSpecs *sync.Map
dbConn *basenine.Connection
dbMutex sync.Mutex
entriesQuery string
}
func GetDefaultOasGeneratorInstance() *defaultOasGenerator {
@@ -45,102 +40,29 @@ func GetDefaultOasGeneratorInstance() *defaultOasGenerator {
return instance
}
func (g *defaultOasGenerator) Start(conn *basenine.Connection) {
if g.started {
return
}
if g.dbConn == nil {
if conn == nil {
logger.Log.Infof("Creating new DB connection for OAS generator to address %s:%s", shared.BasenineHost, shared.BaseninePort)
newConn, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
logger.Log.Error("Error connecting to DB for OAS generator, err: %v", err)
return
}
conn = newConn
}
g.dbConn = conn
}
ctx, cancel := context.WithCancel(context.Background())
g.cancel = cancel
g.ctx = ctx
g.serviceSpecs = &sync.Map{}
func (g *defaultOasGenerator) Start() {
g.started = true
go g.runGenerator()
}
func (g *defaultOasGenerator) Stop() {
if !g.started {
return
}
g.started = false
g.cancel()
g.reset()
g.dbMutex.Lock()
defer g.dbMutex.Unlock()
if g.dbConn != nil {
g.dbConn.Close()
g.dbConn = nil
}
}
func (g *defaultOasGenerator) IsStarted() bool {
return g.started
}
func (g *defaultOasGenerator) runGenerator() {
// Make []byte channels to receive the data and the meta
dataChan := make(chan []byte)
metaChan := make(chan []byte)
g.dbMutex.Lock()
defer g.dbMutex.Unlock()
logger.Log.Infof("Querying DB for OAS generator with query '%s'", g.entriesQuery)
if err := g.dbConn.Query("latest", g.entriesQuery, dataChan, metaChan); err != nil {
logger.Log.Errorf("Query mode call failed: %v", err)
func (g *defaultOasGenerator) HandleEntry(mizuEntry *api.Entry) {
if !g.started {
return
}
for {
select {
case <-g.ctx.Done():
logger.Log.Infof("OAS Generator was canceled")
close(dataChan)
close(metaChan)
return
case metaBytes, ok := <-metaChan:
if !ok {
logger.Log.Infof("OAS Generator - meta channel closed")
break
}
logger.Log.Debugf("Meta: %s", metaBytes)
case dataBytes, ok := <-dataChan:
if !ok {
logger.Log.Infof("OAS Generator - entries channel closed")
break
}
logger.Log.Debugf("Data: %s", dataBytes)
e := new(api.Entry)
err := json.Unmarshal(dataBytes, e)
if err != nil {
continue
}
g.handleEntry(e)
}
}
}
func (g *defaultOasGenerator) handleEntry(mizuEntry *api.Entry) {
if mizuEntry.Protocol.Name == "http" {
dest := mizuEntry.Destination.Name
if dest == "" {
@@ -210,18 +132,9 @@ func (g *defaultOasGenerator) GetServiceSpecs() *sync.Map {
return g.serviceSpecs
}
func (g *defaultOasGenerator) SetEntriesQuery(query string) bool {
changed := g.entriesQuery != query
g.entriesQuery = query
return changed
}
func NewDefaultOasGenerator() *defaultOasGenerator {
return &defaultOasGenerator{
started: false,
ctx: nil,
cancel: nil,
serviceSpecs: nil,
dbConn: nil,
serviceSpecs: &sync.Map{},
}
}

View File

@@ -8,7 +8,7 @@ import (
)
func TestOASGen(t *testing.T) {
gen := new(defaultOasGenerator)
gen := GetDefaultOasGeneratorInstance()
e := new(har.Entry)
err := json.Unmarshal([]byte(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`), e)
@@ -21,8 +21,7 @@ func TestOASGen(t *testing.T) {
Entry: *e,
}
dummyConn := GetFakeDBConn(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`)
gen.Start(dummyConn)
gen.Start()
gen.handleHARWithSource(ews)
g, ok := gen.serviceSpecs.Load("some")
if !ok {

View File

@@ -1,10 +1,8 @@
package oas
import (
"bytes"
"encoding/json"
"io/ioutil"
"net"
"os"
"regexp"
"strings"
@@ -13,22 +11,11 @@ import (
"time"
"github.com/chanced/openapi"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/logger"
"github.com/wI2L/jsondiff"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/har"
)
func GetFakeDBConn(send string) *basenine.Connection {
dummyConn := new(basenine.Connection)
dummyConn.Conn = FakeConn{
sendBuffer: bytes.NewBufferString(send),
receiveBuffer: bytes.NewBufferString(""),
}
return dummyConn
}
// if started via env, write file into subdir
func outputSpec(label string, spec *openapi.OpenAPI, t *testing.T) string {
content, err := json.MarshalIndent(spec, "", " ")
@@ -278,17 +265,3 @@ func TestLoadValid3_1(t *testing.T) {
t.FailNow()
}
}
type FakeConn struct {
sendBuffer *bytes.Buffer
receiveBuffer *bytes.Buffer
}
func (f FakeConn) Read(p []byte) (int, error) { return f.sendBuffer.Read(p) }
func (f FakeConn) Write(p []byte) (int, error) { return f.receiveBuffer.Write(p) }
func (FakeConn) Close() error { return nil }
func (FakeConn) LocalAddr() net.Addr { return nil }
func (FakeConn) RemoteAddr() net.Addr { return nil }
func (FakeConn) SetDeadline(t time.Time) error { return nil }
func (FakeConn) SetReadDeadline(t time.Time) error { return nil }
func (FakeConn) SetWriteDeadline(t time.Time) error { return nil }

View File

@@ -114,7 +114,7 @@ func init() {
tapCmd.Flags().Bool(configStructs.AnalysisTapName, defaultTapConfig.Analysis, "Uploads traffic to UP9 for further analysis (Beta)")
tapCmd.Flags().BoolP(configStructs.AllNamespacesTapName, "A", defaultTapConfig.AllNamespaces, "Tap all namespaces")
tapCmd.Flags().StringSliceP(configStructs.PlainTextFilterRegexesTapName, "r", defaultTapConfig.PlainTextFilterRegexes, "List of regex expressions that are used to filter matching values from text/plain http bodies")
tapCmd.Flags().Bool(configStructs.DisableRedactionTapName, defaultTapConfig.DisableRedaction, "Disables redaction of potentially sensitive request/response headers and body values")
tapCmd.Flags().Bool(configStructs.EnableRedactionTapName, defaultTapConfig.EnableRedaction, "Enables redaction of potentially sensitive request/response headers and body values")
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size")
tapCmd.Flags().String(configStructs.InsertionFilterName, defaultTapConfig.InsertionFilter, "Set the insertion filter. Accepts string or a file path.")
tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
@@ -123,4 +123,5 @@ func init() {
tapCmd.Flags().String(configStructs.ContractFile, defaultTapConfig.ContractFile, "OAS/Swagger file to validate to monitor the contracts")
tapCmd.Flags().Bool(configStructs.ServiceMeshName, defaultTapConfig.ServiceMesh, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
tapCmd.Flags().Bool(configStructs.TlsName, defaultTapConfig.Tls, "Record tls traffic")
tapCmd.Flags().Bool(configStructs.ProfilerName, defaultTapConfig.Profiler, "Run pprof server")
}

View File

@@ -124,7 +124,7 @@ func RunMizuTap() {
}
logger.Log.Infof("Waiting for Mizu Agent to start...")
if state.mizuServiceAccountExists, err = resources.CreateTapMizuResources(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig, config.Config.IsNsRestrictedMode(), config.Config.MizuResourcesNamespace, config.Config.AgentImage, getSyncEntriesConfig(), config.Config.Tap.MaxEntriesDBSizeBytes(), config.Config.Tap.ApiServerResources, config.Config.ImagePullPolicy(), config.Config.LogLevel()); err != nil {
if state.mizuServiceAccountExists, err = resources.CreateTapMizuResources(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig, config.Config.IsNsRestrictedMode(), config.Config.MizuResourcesNamespace, config.Config.AgentImage, getSyncEntriesConfig(), config.Config.Tap.MaxEntriesDBSizeBytes(), config.Config.Tap.ApiServerResources, config.Config.ImagePullPolicy(), config.Config.LogLevel(), config.Config.Tap.Profiler); err != nil {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
logger.Log.Info("Mizu is already running in this namespace, change the `mizu-resources-namespace` configuration or run `mizu clean` to remove the currently running Mizu instance")
@@ -291,7 +291,7 @@ func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
return &api.TrafficFilteringOptions{
PlainTextMaskingRegexes: compiledRegexSlice,
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
DisableRedaction: config.Config.Tap.DisableRedaction,
EnableRedaction: config.Config.Tap.EnableRedaction,
}, nil
}

View File

@@ -21,7 +21,7 @@ const (
AnalysisTapName = "analysis"
AllNamespacesTapName = "all-namespaces"
PlainTextFilterRegexesTapName = "regex-masking"
DisableRedactionTapName = "no-redact"
EnableRedactionTapName = "redact"
HumanMaxEntriesDBSizeTapName = "max-entries-db-size"
InsertionFilterName = "insertion-filter"
DryRunTapName = "dry-run"
@@ -30,6 +30,7 @@ const (
ContractFile = "contract"
ServiceMeshName = "service-mesh"
TlsName = "tls"
ProfilerName = "profiler"
)
type TapConfig struct {
@@ -42,7 +43,7 @@ type TapConfig struct {
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
PlainTextFilterRegexes []string `yaml:"regex-masking"`
IgnoredUserAgents []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
EnableRedaction bool `yaml:"redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
InsertionFilter string `yaml:"insertion-filter" default:""`
DryRun bool `yaml:"dry-run" default:"false"`
@@ -54,6 +55,7 @@ type TapConfig struct {
TapperResources shared.Resources `yaml:"tapper-resources"`
ServiceMesh bool `yaml:"service-mesh" default:"false"`
Tls bool `yaml:"tls" default:"false"`
Profiler bool `yaml:"profiler" default:"false"`
}
func (config *TapConfig) PodRegex() *regexp.Regexp {

View File

@@ -74,6 +74,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday v1.6.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/up9inc/mizu/tap/dbgctl v0.0.0 // indirect
github.com/xlab/treeprint v1.1.0 // indirect
go.starlark.net v0.0.0-20220203230714-bb14e151c28f // indirect
golang.org/x/crypto v0.0.0-20220208050332-20e1d8d225ab // indirect
@@ -104,3 +105,5 @@ replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api
replace github.com/up9inc/mizu/tap/dbgctl v0.0.0 => ../tap/dbgctl

View File

@@ -14,7 +14,7 @@ import (
core "k8s.io/api/core/v1"
)
func CreateTapMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string, isNsRestrictedMode bool, mizuResourcesNamespace string, agentImage string, syncEntriesConfig *shared.SyncEntriesConfig, maxEntriesDBSizeBytes int64, apiServerResources shared.Resources, imagePullPolicy core.PullPolicy, logLevel logging.Level) (bool, error) {
func CreateTapMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string, isNsRestrictedMode bool, mizuResourcesNamespace string, agentImage string, syncEntriesConfig *shared.SyncEntriesConfig, maxEntriesDBSizeBytes int64, apiServerResources shared.Resources, imagePullPolicy core.PullPolicy, logLevel logging.Level, profiler bool) (bool, error) {
if !isNsRestrictedMode {
if err := createMizuNamespace(ctx, kubernetesProvider, mizuResourcesNamespace); err != nil {
return false, err
@@ -50,6 +50,7 @@ func CreateTapMizuResources(ctx context.Context, kubernetesProvider *kubernetes.
Resources: apiServerResources,
ImagePullPolicy: imagePullPolicy,
LogLevel: logLevel,
Profiler: profiler,
}
if err := createMizuApiServerPod(ctx, kubernetesProvider, opts); err != nil {

View File

@@ -0,0 +1,107 @@
# Performance analysis
This directory contains tools for analyzing tapper performance.
# Periodic tapper logs
In tapper logs there are some periodic lines that shows its internal state and consumed resources.
Internal state example (formatted and commented):
```
stats - {
"processedBytes":468940592, // how many bytes we read from pcap
"packetsCount":174883, // how many packets we read from pcap
"tcpPacketsCount":174883, // how many tcp packets we read from pcap
"reassembledTcpPayloadsCount":66893, // how many chunks sent to tcp stream
"matchedPairs":24821, // how many request response pairs found
"droppedTcpStreams":2 // how many tcp streams remained stale and dropped
}
```
Consumed resources example (formatted and commented):
```
mem: 24441240, // golang heap size
goroutines: 29, // how many goroutines
cpu: 91.208791, // how much cpu the tapper process consume (in percentage per core)
cores: 16, // how many cores there are on the machine
rss: 87052288 // how many bytes held by the tapper process
```
# Plot tapper logs
In order to plot a tapper log or many logs into a graph, use the `plot_from_tapper_logs.py` util.
It gets a list of tapper logs as a parameter, and output an image with a nice graph.
The log file names should be named in this format `XX_DESCRIPTION.log` when XX is the number between determining the color of the output graph and description is the name of the series. It allows for easy comparison between various modes.
Example run:
```
cd $MIZU_HOME/performance_analysis
virtualenv venv
source venv/bin/activate
pip install -r requirements.txt
python plot_from_tapper_logs.py 00_tapper.log
```
# Tapper Modes
Every packet seen by the tapper is processed in a pipeline that contains various stages.
* Pcap - Read the packet from libpcap
* Assembler - Assemble the packet into a TcpStream
* TcpStream - Hold stream information and TcpReaders
* Dissectors - Read from TcpReader and recognize the packet content and protocol.
* Emit - Marshal the request response pair into a Json
* Send - Send the Json to Api Server
Tapper can be run with various debug modes:
* No Pcap - Start the tapper process, but don't read from any packets from pcap
* No Assembler - Read packets from pcap, but don't assemble them
* No TcpStream - Assemble the packets, but don't create TcpStream for them
* No Dissectors - Create a TcpStream for the packets, but don't dissect their content
* No Emit - Dissect the TcpStream, but don't emit the matched request response pair
* No Send - Emit the request response pair, but don't send them to the Api Server.
* Regular mode
![Tapper Modes](https://github.com/up9inc/mizu/blob/debug/profile-tapper-benchmark/performance_analysis/tapper-modes.png)
# Run benchmark with various tapper modes
## Prerequisite
In order to run the benchmark you probably want:
1. An up and running Api Server
2. An up and running Basenine
3. An up and running UI (optional)
4. An up and running test server, like nginx, that can return a known payload at a known endpoint.
5. Set MIZU_HOME environment variable to points to mizu directory
6. Install the `hey` tool
## Running the benchmark
In order to run a benchmark use the `run_tapper_benchmark.sh` script.
Example run:
```
cd $MIZU_HOME/performance_analysis
source venv/bin/activate # Assuming you already run plot_from_tapper_logs.py
./run_tapper_benchmark.sh
```
Running it without params use the default values, use the following environment variables for customization:
```
export=MIZU_BENCHMARK_OUTPUT_DIR=/path/to/dir # Set the output directory for tapper logs and graph
export=MIZU_BENCHMARK_CLIENT_PERIOD=1m # How long each test run
export=MIZU_BENCHMARK_URL=http://server:port/path # The URL to use for the benchmarking process (the test server endpoint)
export=MIZU_BENCHMARK_RUN_COUNT=3 # How many times each tapper mode should run
export=MIZU_BENCHMARK_QPS=250 # How many queries per second the each client should send to the test server
export=MIZU_BENCHMARK_CLIENTS_COUNT=5 # How many clients should run in parallel during the benchmark
```
# Example output graph
An example output graph from a 15 min run with 15K payload and 1000 QPS looks like
![Example Graph](https://github.com/up9inc/mizu/blob/debug/profile-tapper-benchmark/performance_analysis/example-graph.png)

Binary file not shown.

After

Width:  |  Height:  |  Size: 327 KiB

View File

@@ -0,0 +1,182 @@
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pathlib
import re
import sys
import typing
COLORMAP = plt.get_cmap('turbo')
# Extract cpu and rss samples from log files and plot them
# Input: List of log files
#
# example:
# python plot_from_tapper_logs.py 01_no_pcap_01.log 99_normal_00.log
#
# The script assumes that the log file names start with a number (pattern '\d+')
# and groups based on this number. Files that start will the same number will be plotted with the same color.
# Change group_pattern to an empty string to disable this, or change to a regex of your liking.
def get_sample(name: str, line: str, default_value: float):
pattern = name + r': ?(\d+(\.\d+)?)'
maybe_sample = re.findall(pattern, line)
if len(maybe_sample) == 0:
return default_value
sample = float(maybe_sample[0][0])
return sample
def append_sample(name: str, line: str, samples: typing.List[float]):
sample = get_sample(name, line, -1)
if sample == -1:
return
samples.append(sample)
def extract_samples(f: typing.IO) -> typing.Tuple[pd.Series, pd.Series, pd.Series, pd.Series, pd.Series, pd.Series, pd.Series, pd.Series]:
cpu_samples = []
rss_samples = []
count_samples = []
matched_samples = []
live_samples = []
processed_samples = []
heap_samples = []
goroutines_samples = []
for line in f:
append_sample('cpu', line, cpu_samples)
append_sample('rss', line, rss_samples)
ignored_packets_count = get_sample('"ignoredPacketsCount"', line, -1)
packets_count = get_sample('"packetsCount"', line, -1)
if ignored_packets_count != -1 and packets_count != -1:
count_samples.append(packets_count - ignored_packets_count)
append_sample('"matchedPairs"', line, matched_samples)
append_sample('"liveTcpStreams"', line, live_samples)
append_sample('"processedBytes"', line, processed_samples)
append_sample('mem', line, heap_samples)
append_sample('goroutines', line, goroutines_samples)
cpu_samples = pd.Series(cpu_samples)
rss_samples = pd.Series(rss_samples)
count_samples = pd.Series(count_samples)
matched_samples = pd.Series(matched_samples)
live_samples = pd.Series(live_samples)
processed_samples = pd.Series(processed_samples)
heap_samples = pd.Series(heap_samples)
goroutines_samples = pd.Series(goroutines_samples)
return cpu_samples, rss_samples, count_samples, matched_samples, live_samples, processed_samples, heap_samples, goroutines_samples
def plot(ax, df: pd.DataFrame, title: str, xlabel: str, ylabel: str, group_pattern: typing.Optional[str]):
if group_pattern:
color = get_group_color(df.columns, group_pattern)
df.plot(color=color, ax=ax)
else:
df.plot(cmap=COLORMAP, ax=ax)
ax.ticklabel_format(style='plain')
plt.title(title)
plt.legend()
plt.xlabel(xlabel)
plt.ylabel(ylabel)
def get_group_color(names, pattern):
props = [int(re.findall(pattern, pathlib.Path(name).name)[0]) for name in names]
key = dict(zip(sorted(list(set(props))), range(len(set(props)))))
n_colors = len(key)
color_options = plt.get_cmap('jet')(np.linspace(0, 1, n_colors))
groups = [key[prop] for prop in props]
color = color_options[groups] # type: ignore
return color
if __name__ == '__main__':
filenames = sys.argv[1:]
cpu_samples_all_files = []
rss_samples_all_files = []
count_samples_all_files = []
matched_samples_all_files = []
live_samples_all_files = []
processed_samples_all_files = []
heap_samples_all_files = []
goroutines_samples_all_files = []
for ii, filename in enumerate(filenames):
print("Analyzing {}".format(filename))
with open(filename, 'r') as f:
cpu_samples, rss_samples, count_samples, matched_samples, live_samples, processed_samples, heap_samples, goroutines_samples = extract_samples(f)
cpu_samples.name = pathlib.Path(filename).name
rss_samples.name = pathlib.Path(filename).name
count_samples.name = pathlib.Path(filename).name
matched_samples.name = pathlib.Path(filename).name
live_samples.name = pathlib.Path(filename).name
processed_samples.name = pathlib.Path(filename).name
heap_samples.name = pathlib.Path(filename).name
goroutines_samples.name = pathlib.Path(filename).name
cpu_samples_all_files.append(cpu_samples)
rss_samples_all_files.append(rss_samples)
count_samples_all_files.append(count_samples)
matched_samples_all_files.append(matched_samples)
live_samples_all_files.append(live_samples)
processed_samples_all_files.append(processed_samples)
heap_samples_all_files.append(heap_samples)
goroutines_samples_all_files.append(goroutines_samples)
cpu_samples_df = pd.concat(cpu_samples_all_files, axis=1)
rss_samples_df = pd.concat(rss_samples_all_files, axis=1)
count_samples_df = pd.concat(count_samples_all_files, axis=1)
matched_samples_df = pd.concat(matched_samples_all_files, axis=1)
live_samples_df = pd.concat(live_samples_all_files, axis=1)
processed_samples_df = pd.concat(processed_samples_all_files, axis=1)
heap_samples_df = pd.concat(heap_samples_all_files, axis=1)
goroutines_samples_df = pd.concat(goroutines_samples_all_files, axis=1)
group_pattern = r'^\d+'
cpu_plot = plt.subplot(8, 2, 1)
plot(cpu_plot, cpu_samples_df, 'cpu', '', 'cpu (%)', group_pattern)
cpu_plot.legend().remove()
mem_plot = plt.subplot(8, 2, 2)
plot(mem_plot, (rss_samples_df / 1024 / 1024), 'rss', '', 'mem (mega)', group_pattern)
mem_plot.legend(loc='center left', bbox_to_anchor=(1, 0.5))
packets_plot = plt.subplot(8, 2, 3)
plot(packets_plot, count_samples_df, 'packetsCount', '', 'packetsCount', group_pattern)
packets_plot.legend().remove()
matched_plot = plt.subplot(8, 2, 4)
plot(matched_plot, matched_samples_df, 'matchedCount', '', 'matchedCount', group_pattern)
matched_plot.legend().remove()
live_plot = plt.subplot(8, 2, 5)
plot(live_plot, live_samples_df, 'liveStreamsCount', '', 'liveStreamsCount', group_pattern)
live_plot.legend().remove()
processed_plot = plt.subplot(8, 2, 6)
plot(processed_plot, (processed_samples_df / 1024 / 1024), 'processedBytes', '', 'bytes (mega)', group_pattern)
processed_plot.legend().remove()
heap_plot = plt.subplot(8, 2, 7)
plot(heap_plot, (heap_samples_df / 1024 / 1024), 'heap', '', 'heap (mega)', group_pattern)
heap_plot.legend().remove()
goroutines_plot = plt.subplot(8, 2, 8)
plot(goroutines_plot, goroutines_samples_df, 'goroutines', '', 'goroutines', group_pattern)
goroutines_plot.legend().remove()
fig = plt.gcf()
fig.set_size_inches(20, 18)
print('Saving graph to graph.png')
plt.savefig('graph.png', bbox_inches='tight')

View File

@@ -0,0 +1,2 @@
matplotlib
pandas

View File

@@ -0,0 +1,100 @@
#!/bin/bash
[ -z "$MIZU_HOME" ] && { echo "MIZU_HOME is missing"; exit 1; }
[ -z "$MIZU_BENCHMARK_OUTPUT_DIR" ] && export MIZU_BENCHMARK_OUTPUT_DIR="/tmp/mizu-benchmark-results-$(date +%d-%m-%H-%M)"
[ -z "$MIZU_BENCHMARK_CLIENT_PERIOD" ] && export MIZU_BENCHMARK_CLIENT_PERIOD="1m"
[ -z "$MIZU_BENCHMARK_URL" ] && export MIZU_BENCHMARK_URL="http://localhost:8081/data/b.1000.json"
[ -z "$MIZU_BENCHMARK_RUN_COUNT" ] && export MIZU_BENCHMARK_RUN_COUNT="3"
[ -z "$MIZU_BENCHMARK_QPS" ] && export MIZU_BENCHMARK_QPS="500"
[ -z "$MIZU_BENCHMARK_CLIENTS_COUNT" ] && export MIZU_BENCHMARK_CLIENTS_COUNT="5"
function log() {
local message=$@
printf "[%s] %s\n" "$(date "+%d-%m %H:%M:%S")" "$message"
}
function run_single_bench() {
local mode_num=$1
local mode_str=$2
log "Starting ${mode_num}_${mode_str} (runs: $MIZU_BENCHMARK_RUN_COUNT) (period: $MIZU_BENCHMARK_CLIENT_PERIOD)"
for ((i=0;i<"$MIZU_BENCHMARK_RUN_COUNT";i++)); do
log " $i: Running tapper"
rm -f tapper.log
tapper_args=("--tap" "--api-server-address" "ws://localhost:8899/wsTapper" "-stats" "10" "-ignore-ports" "8899,9099")
if [[ $(uname) == "Darwin" ]]
then
tapper_args+=("-i" "lo0" "-"decoder "Loopback")
else
tapper_args+=("-i" "lo")
fi
nohup ./agent/build/mizuagent ${tapper_args[@]} > tapper.log 2>&1 &
log " $i: Running client (hey)"
hey -z $MIZU_BENCHMARK_CLIENT_PERIOD -c $MIZU_BENCHMARK_CLIENTS_COUNT -q $MIZU_BENCHMARK_QPS $MIZU_BENCHMARK_URL > /dev/null || return 1
log " $i: Killing tapper"
kill -9 $(ps -ef | grep agent/build/mizuagent | grep tap | grep -v grep | awk '{ print $2 }') > /dev/null 2>&1
local output_file=$MIZU_BENCHMARK_OUTPUT_DIR/${mode_num}_${mode_str}_${i}.log
log " $i: Moving output to $output_file"
mv tapper.log $output_file || return 1
done
}
function generate_bench_graph() {
cd performance_analysis/ || return 1
source venv/bin/activate
python plot_from_tapper_logs.py $MIZU_BENCHMARK_OUTPUT_DIR/*.log || return 1
mv graph.png $MIZU_BENCHMARK_OUTPUT_DIR || return 1
}
mkdir -p $MIZU_BENCHMARK_OUTPUT_DIR
rm -f $MIZU_BENCHMARK_OUTPUT_DIR/*
log "Writing output to $MIZU_BENCHMARK_OUTPUT_DIR"
cd $MIZU_HOME || exit 1
export HOST_MODE=0
export SENSITIVE_DATA_FILTERING_OPTIONS='{"EnableRedaction": false}'
export MIZU_DEBUG_DISABLE_PCAP=false
export MIZU_DEBUG_DISABLE_TCP_REASSEMBLY=false
export MIZU_DEBUG_DISABLE_TCP_STREAM=false
export MIZU_DEBUG_DISABLE_NON_HTTP_EXTENSSION=false
export MIZU_DEBUG_DISABLE_DISSECTORS=false
export MIZU_DEBUG_DISABLE_EMITTING=false
export MIZU_DEBUG_DISABLE_SENDING=false
export MIZU_DEBUG_DISABLE_PCAP=true
run_single_bench "01" "no_pcap" || exit 1
export MIZU_DEBUG_DISABLE_PCAP=false
export MIZU_DEBUG_DISABLE_TCP_REASSEMBLY=true
run_single_bench "02" "no_assembler" || exit 1
export MIZU_DEBUG_DISABLE_TCP_REASSEMBLY=false
export MIZU_DEBUG_DISABLE_TCP_STREAM=true
run_single_bench "03" "no_tcp_stream" || exit 1
export MIZU_DEBUG_DISABLE_TCP_STREAM=false
export MIZU_DEBUG_DISABLE_NON_HTTP_EXTENSSION=true
run_single_bench "04" "only_http" || exit 1
export MIZU_DEBUG_DISABLE_NON_HTTP_EXTENSSION=false
export MIZU_DEBUG_DISABLE_DISSECTORS=true
run_single_bench "05" "no_dissectors" || exit 1
export MIZU_DEBUG_DISABLE_DISSECTORS=false
export MIZU_DEBUG_DISABLE_EMITTING=true
run_single_bench "06" "no_emit" || exit 1
export MIZU_DEBUG_DISABLE_EMITTING=false
export MIZU_DEBUG_DISABLE_SENDING=true
run_single_bench "07" "no_send" || exit 1
export MIZU_DEBUG_DISABLE_SENDING=false
run_single_bench "08" "normal" || exit 1
generate_bench_graph || exit 1
log "Output written to to $MIZU_BENCHMARK_OUTPUT_DIR"

Binary file not shown.

After

Width:  |  Height:  |  Size: 259 KiB

View File

@@ -67,6 +67,7 @@ require (
github.com/spf13/cobra v1.3.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/up9inc/mizu/tap/dbgctl v0.0.0 // indirect
github.com/xlab/treeprint v1.1.0 // indirect
go.starlark.net v0.0.0-20220203230714-bb14e151c28f // indirect
golang.org/x/crypto v0.0.0-20220208050332-20e1d8d225ab // indirect
@@ -95,3 +96,5 @@ require (
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api
replace github.com/up9inc/mizu/tap/dbgctl v0.0.0 => ../tap/dbgctl

View File

@@ -181,6 +181,7 @@ type ApiServerOptions struct {
Resources shared.Resources
ImagePullPolicy core.PullPolicy
LogLevel logging.Level
Profiler bool
}
func (provider *Provider) GetMizuApiServerPodObject(opts *ApiServerOptions, mountVolumeClaim bool, volumeClaimName string, createAuthContainer bool) (*core.Pod, error) {
@@ -212,7 +213,15 @@ func (provider *Provider) GetMizuApiServerPodObject(opts *ApiServerOptions, moun
return nil, fmt.Errorf("invalid memory request for %s container", opts.PodName)
}
command := []string{"./mizuagent", "--api-server"}
command := []string{
"./mizuagent",
"--api-server",
}
if opts.Profiler {
command = append(command, "--profiler")
}
if opts.IsNamespaceRestricted {
command = append(command, "--namespace", opts.Namespace)
}
@@ -383,6 +392,16 @@ func (provider *Provider) GetMizuApiServerPodObject(opts *ApiServerOptions, moun
Volumes: volumes,
DNSPolicy: core.DNSClusterFirstWithHostNet,
TerminationGracePeriodSeconds: new(int64),
Tolerations: []core.Toleration{
{
Operator: core.TolerationOpExists,
Effect: core.TaintEffectNoExecute,
},
{
Operator: core.TolerationOpExists,
Effect: core.TaintEffectNoSchedule,
},
},
},
}

View File

@@ -15,6 +15,8 @@ import (
"time"
"github.com/google/martian/har"
"github.com/up9inc/mizu/tap/dbgctl"
)
const mizuTestEnvVar = "MIZU_TEST"
@@ -104,11 +106,6 @@ type OutputChannelItem struct {
Namespace string
}
type ProtoIdentifier struct {
Protocol *Protocol
IsClosedOthers bool
}
type ReadProgress struct {
readBytes int
lastCurrent int
@@ -123,6 +120,11 @@ func (p *ReadProgress) Current() (n int) {
return p.lastCurrent
}
func (p *ReadProgress) Reset() {
p.readBytes = 0
p.lastCurrent = 0
}
type Dissector interface {
Register(*Extension)
Ping()
@@ -149,8 +151,13 @@ type Emitter interface {
}
func (e *Emitting) Emit(item *OutputChannelItem) {
e.OutputChannel <- item
e.AppStats.IncMatchedPairs()
if dbgctl.MizuTapperDisableEmitting {
return
}
e.OutputChannel <- item
}
type Entry struct {
@@ -419,13 +426,11 @@ type TcpReader interface {
GetCaptureTime() time.Time
GetEmitter() Emitter
GetIsClosed() bool
GetExtension() *Extension
}
type TcpStream interface {
SetProtocol(protocol *Protocol)
GetOrigin() Capture
GetProtoIdentifier() *ProtoIdentifier
GetReqResMatchers() []RequestResponseMatcher
GetIsTapTarget() bool
GetIsClosed() bool

View File

@@ -2,4 +2,9 @@ module github.com/up9inc/mizu/tap/api
go 1.17
require github.com/google/martian v2.1.0+incompatible
require (
github.com/google/martian v2.1.0+incompatible
github.com/up9inc/mizu/tap/dbgctl v0.0.0
)
replace github.com/up9inc/mizu/tap/dbgctl v0.0.0 => ../dbgctl

View File

@@ -3,5 +3,5 @@ package api
type TrafficFilteringOptions struct {
IgnoredUserAgents []string
PlainTextMaskingRegexes []*SerializableRegexp
DisableRedaction bool
EnableRedaction bool
}

View File

@@ -10,10 +10,12 @@ type AppStats struct {
ProcessedBytes uint64 `json:"processedBytes"`
PacketsCount uint64 `json:"packetsCount"`
TcpPacketsCount uint64 `json:"tcpPacketsCount"`
IgnoredPacketsCount uint64 `json:"ignoredPacketsCount"`
ReassembledTcpPayloadsCount uint64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount uint64 `json:"tlsConnectionsCount"`
MatchedPairs uint64 `json:"matchedPairs"`
DroppedTcpStreams uint64 `json:"droppedTcpStreams"`
LiveTcpStreams uint64 `json:"liveTcpStreams"`
}
func (as *AppStats) IncMatchedPairs() {
@@ -33,6 +35,10 @@ func (as *AppStats) IncTcpPacketsCount() {
atomic.AddUint64(&as.TcpPacketsCount, 1)
}
func (as *AppStats) IncIgnoredPacketsCount() {
atomic.AddUint64(&as.IgnoredPacketsCount, 1)
}
func (as *AppStats) IncReassembledTcpPayloadsCount() {
atomic.AddUint64(&as.ReassembledTcpPayloadsCount, 1)
}
@@ -41,6 +47,14 @@ func (as *AppStats) IncTlsConnectionsCount() {
atomic.AddUint64(&as.TlsConnectionsCount, 1)
}
func (as *AppStats) IncLiveTcpStreams() {
atomic.AddUint64(&as.LiveTcpStreams, 1)
}
func (as *AppStats) DecLiveTcpStreams() {
atomic.AddUint64(&as.LiveTcpStreams, ^uint64(0))
}
func (as *AppStats) UpdateProcessedBytes(size uint64) {
atomic.AddUint64(&as.ProcessedBytes, size)
}
@@ -55,10 +69,12 @@ func (as *AppStats) DumpStats() *AppStats {
currentAppStats.ProcessedBytes = resetUint64(&as.ProcessedBytes)
currentAppStats.PacketsCount = resetUint64(&as.PacketsCount)
currentAppStats.TcpPacketsCount = resetUint64(&as.TcpPacketsCount)
currentAppStats.IgnoredPacketsCount = resetUint64(&as.IgnoredPacketsCount)
currentAppStats.ReassembledTcpPayloadsCount = resetUint64(&as.ReassembledTcpPayloadsCount)
currentAppStats.TlsConnectionsCount = resetUint64(&as.TlsConnectionsCount)
currentAppStats.MatchedPairs = resetUint64(&as.MatchedPairs)
currentAppStats.DroppedTcpStreams = resetUint64(&as.DroppedTcpStreams)
currentAppStats.LiveTcpStreams = as.LiveTcpStreams
return currentAppStats
}

View File

@@ -0,0 +1,15 @@
package dbgctl
import (
"os"
)
var (
MizuTapperDisablePcap bool = os.Getenv("MIZU_DEBUG_DISABLE_PCAP") == "true"
MizuTapperDisableTcpReassembly bool = os.Getenv("MIZU_DEBUG_DISABLE_TCP_REASSEMBLY") == "true"
MizuTapperDisableTcpStream bool = os.Getenv("MIZU_DEBUG_DISABLE_TCP_STREAM") == "true"
MizuTapperDisableDissectors bool = os.Getenv("MIZU_DEBUG_DISABLE_DISSECTORS") == "true"
MizuTapperDisableEmitting bool = os.Getenv("MIZU_DEBUG_DISABLE_EMITTING") == "true"
MizuTapperDisableSending bool = os.Getenv("MIZU_DEBUG_DISABLE_SENDING") == "true"
MizuTapperDisableNonHttpExtensions bool = os.Getenv("MIZU_DEBUG_DISABLE_NON_HTTP_EXTENSSION") == "true"
)

3
tap/dbgctl/go.mod Normal file
View File

@@ -0,0 +1,3 @@
module github.com/up9inc/mizu/tap/dbgctl
go 1.18

View File

@@ -11,7 +11,10 @@ require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/google/martian v2.1.0+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/up9inc/mizu/tap/dbgctl v0.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
replace github.com/up9inc/mizu/tap/dbgctl v0.0.0 => ../../dbgctl

View File

@@ -3,7 +3,6 @@ package amqp
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"log"
@@ -75,14 +74,10 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
var lastMethodFrameMessage Message
for {
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &protocol {
return errors.New("Identified by another protocol")
}
frame, err := r.ReadFrame()
if err == io.EOF {
// We must read until we see an EOF... very important!
return nil
return err
}
switch f := frame.(type) {
@@ -90,6 +85,8 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
// drop
case *HeaderFrame:
reader.GetParent().SetProtocol(&protocol)
// start content state
header = f
remaining = int(header.Size)
@@ -107,20 +104,22 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
}
case *BodyFrame:
reader.GetParent().SetProtocol(&protocol)
// continue until terminated
remaining -= len(f.Body)
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Body = f.Body
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
}
case *MethodFrame:
reader.GetParent().SetProtocol(&protocol)
lastMethodFrameMessage = f.Method
switch m := f.Method.(type) {
case *BasicPublish:
@@ -137,7 +136,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
NoWait: m.NoWait,
Arguments: m.Arguments,
}
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *BasicConsume:
@@ -150,7 +148,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
NoWait: m.NoWait,
Arguments: m.Arguments,
}
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *BasicDeliver:
@@ -170,7 +167,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
NoWait: m.NoWait,
Arguments: m.Arguments,
}
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *ExchangeDeclare:
@@ -184,7 +180,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
NoWait: m.NoWait,
Arguments: m.Arguments,
}
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *ConnectionStart:
@@ -195,7 +190,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
Mechanisms: m.Mechanisms,
Locales: m.Locales,
}
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *ConnectionClose:
@@ -205,7 +199,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
ClassId: m.ClassId,
MethodId: m.MethodId,
}
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
}

View File

@@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -7,18 +7,16 @@ import (
)
type tcpStream struct {
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
isClosed bool
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
sync.Mutex
}
func NewTcpStream(capture api.Capture) api.TcpStream {
return &tcpStream{
origin: capture,
protoIdentifier: &api.ProtoIdentifier{},
origin: capture,
}
}
@@ -28,10 +26,6 @@ func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
}
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
return t.reqResMatchers
}

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect9/http/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect10/http/\* expect

View File

@@ -14,8 +14,11 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/martian v2.1.0+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/up9inc/mizu/tap/dbgctl v0.0.0 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
replace github.com/up9inc/mizu/tap/dbgctl v0.0.0 => ../../dbgctl

View File

@@ -18,7 +18,7 @@ func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *ap
return
}
if !options.DisableRedaction {
if options.EnableRedaction {
FilterSensitiveData(item, options)
}

View File

@@ -3,7 +3,6 @@ package http
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"log"
@@ -144,10 +143,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
http2Assembler = createHTTP2Assembler(b)
}
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &http11protocol {
return errors.New("Identified by another protocol")
}
if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, reader.GetReadProgress(), reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCaptureTime(), reader.GetEmitter(), options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
@@ -200,10 +195,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
}
}
if reader.GetParent().GetProtoIdentifier().Protocol == nil {
return err
}
return nil
}

View File

@@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -7,18 +7,16 @@ import (
)
type tcpStream struct {
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
isClosed bool
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
sync.Mutex
}
func NewTcpStream(capture api.Capture) api.TcpStream {
return &tcpStream{
origin: capture,
protoIdentifier: &api.ProtoIdentifier{},
origin: capture,
}
}
@@ -28,10 +26,6 @@ func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
}
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
return t.reqResMatchers
}

View File

@@ -18,7 +18,10 @@ require (
github.com/klauspost/compress v1.14.2 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/up9inc/mizu/tap/dbgctl v0.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
replace github.com/up9inc/mizu/tap/dbgctl v0.0.0 => ../../dbgctl

View File

@@ -3,7 +3,6 @@ package kafka
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"log"
"time"
@@ -38,10 +37,6 @@ func (d dissecting) Ping() {
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
for {
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &_protocol {
return errors.New("Identified by another protocol")
}
if reader.GetIsClient() {
_, _, err := ReadRequest(b, reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reqResMatcher)
if err != nil {

View File

@@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -7,18 +7,16 @@ import (
)
type tcpStream struct {
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
isClosed bool
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
sync.Mutex
}
func NewTcpStream(capture api.Capture) api.TcpStream {
return &tcpStream{
origin: capture,
protoIdentifier: &api.ProtoIdentifier{},
origin: capture,
}
}
@@ -28,10 +26,6 @@ func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
}
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
return t.reqResMatchers
}

View File

@@ -11,7 +11,10 @@ require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/google/martian v2.1.0+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/up9inc/mizu/tap/dbgctl v0.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
replace github.com/up9inc/mizu/tap/dbgctl v0.0.0 => ../../dbgctl

View File

@@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -7,18 +7,16 @@ import (
)
type tcpStream struct {
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
isClosed bool
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
sync.Mutex
}
func NewTcpStream(capture api.Capture) api.TcpStream {
return &tcpStream{
origin: capture,
protoIdentifier: &api.ProtoIdentifier{},
origin: capture,
}
}
@@ -28,10 +26,6 @@ func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
}
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
return t.reqResMatchers
}

View File

@@ -6,23 +6,30 @@ require (
github.com/cilium/ebpf v0.8.0
github.com/go-errors/errors v1.4.2
github.com/google/gopacket v1.1.19
github.com/hashicorp/golang-lru v0.5.4
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/struCoder/pidusage v0.2.1
github.com/up9inc/mizu/logger v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0
github.com/up9inc/mizu/tap/dbgctl v0.0.0
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74
k8s.io/api v0.23.3
)
require (
github.com/go-logr/logr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/martian v2.1.0+incompatible // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20220207234003-57398862261d // indirect
golang.org/x/text v0.3.7 // indirect
@@ -39,3 +46,5 @@ require (
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
replace github.com/up9inc/mizu/tap/api v0.0.0 => ./api
replace github.com/up9inc/mizu/tap/dbgctl v0.0.0 => ./dbgctl

View File

@@ -31,6 +31,8 @@ github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTg
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.2 h1:ahHml/yUpnlb96Rp8HCvtYVPY8ZYpxq3g7UYchIYwbs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL98+wF9xc8zWvFonSJ8=
@@ -117,6 +119,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
@@ -126,11 +130,19 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/struCoder/pidusage v0.2.1 h1:dFiEgUDkubeIj0XA1NpQ6+8LQmKrLi7NiIQl86E6BoY=
github.com/struCoder/pidusage v0.2.1/go.mod h1:bewtP2KUA1TBUyza5+/PCpSQ6sc/H6jJbIKAzqW86BA=
github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o=
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg=
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@@ -170,6 +182,7 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200217220822-9197077df867/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -184,6 +197,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220207234003-57398862261d h1:Bm7BNOQt2Qv7ZqysjeLjgCBanX+88Z/OtdvsrEv1Djc=
golang.org/x/sys v0.0.0-20220207234003-57398862261d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -276,6 +290,5 @@ sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLzkkmAkf+A6Y=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=

View File

@@ -16,7 +16,10 @@ import (
"runtime"
"strings"
"time"
"strconv"
"github.com/shirou/gopsutil/cpu"
"github.com/struCoder/pidusage"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
@@ -41,6 +44,7 @@ var debug = flag.Bool("debug", false, "Display debug information")
var quiet = flag.Bool("quiet", false, "Be quiet regarding errors")
var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex")
var procfs = flag.String("procfs", "/proc", "The procfs directory, used when mapping host volumes into a container")
var ignoredPorts = flag.String("ignore-ports", "", "A comma separated list of ports to ignore")
// capture
var iface = flag.String("i", "en0", "Interface to read packets from")
@@ -55,7 +59,8 @@ var tls = flag.Bool("tls", false, "Enable TLS tapper")
var memprofile = flag.String("memprofile", "", "Write memory profile")
type TapOpts struct {
HostMode bool
HostMode bool
IgnoredPorts []uint16
}
var extensions []*api.Extension // global
@@ -123,6 +128,16 @@ func printPeriodicStats(cleaner *Cleaner) {
statsPeriod := time.Second * time.Duration(*statsevery)
ticker := time.NewTicker(statsPeriod)
logicalCoreCount, err := cpu.Counts(true)
if err != nil {
logicalCoreCount = -1
}
physicalCoreCount, err := cpu.Counts(false)
if err != nil {
physicalCoreCount = -1
}
for {
<-ticker.C
@@ -139,11 +154,21 @@ func printPeriodicStats(cleaner *Cleaner) {
// At this moment
memStats := runtime.MemStats{}
runtime.ReadMemStats(&memStats)
sysInfo, err := pidusage.GetStat(os.Getpid())
if err != nil {
sysInfo = &pidusage.SysInfo{
CPU: -1,
Memory: -1,
}
}
logger.Log.Infof(
"mem: %d, goroutines: %d",
"mem: %d, goroutines: %d, cpu: %f, cores: %d/%d, rss: %f",
memStats.HeapAlloc,
runtime.NumGoroutine(),
)
sysInfo.CPU,
logicalCoreCount,
physicalCoreCount,
sysInfo.Memory)
// Since the last print
cleanStats := cleaner.dumpStats()
@@ -193,6 +218,8 @@ func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelI
logger.Log.Fatal(err)
}
opts.IgnoredPorts = append(opts.IgnoredPorts, buildIgnoredPortsList(*ignoredPorts)...)
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
return assembler
@@ -267,3 +294,19 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
return &tls
}
func buildIgnoredPortsList(ignoredPorts string) []uint16 {
tmp := strings.Split(ignoredPorts, ",")
result := make([]uint16, len(tmp))
for i, raw := range tmp {
v, err := strconv.Atoi(raw)
if err != nil {
continue
}
result[i] = uint16(v)
}
return result
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/google/gopacket/pcap"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/dbgctl"
"github.com/up9inc/mizu/tap/diagnose"
)
@@ -116,6 +117,9 @@ func (source *tcpPacketSource) close() {
}
func (source *tcpPacketSource) readPackets(ipdefrag bool, packets chan<- TcpPacketInfo) {
if dbgctl.MizuTapperDisablePcap {
return
}
logger.Log.Infof("Start reading packets from %v", source.name)
for {

View File

@@ -12,6 +12,7 @@ import (
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/dbgctl"
"github.com/up9inc/mizu/tap/diagnose"
"github.com/up9inc/mizu/tap/source"
)
@@ -23,6 +24,7 @@ type tcpAssembler struct {
streamPool *reassembly.StreamPool
streamFactory *tcpStreamFactory
assemblerMutex sync.Mutex
ignoredPorts []uint16
}
// Context
@@ -48,8 +50,8 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d",
maxBufferedPagesTotal, maxBufferedPagesPerConnection)
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d, opts=%v",
maxBufferedPagesTotal, maxBufferedPagesPerConnection, opts)
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
@@ -57,6 +59,7 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp
Assembler: assembler,
streamPool: streamPool,
streamFactory: streamFactory,
ignoredPorts: opts.IgnoredPorts,
}
}
@@ -83,14 +86,20 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
diagnose.AppStats.IncTcpPacketsCount()
tcp := tcp.(*layers.TCP)
c := context{
CaptureInfo: packet.Metadata().CaptureInfo,
Origin: packetInfo.Source.Origin,
if a.shouldIgnorePort(uint16(tcp.DstPort)) {
diagnose.AppStats.IncIgnoredPacketsCount()
} else {
c := context{
CaptureInfo: packet.Metadata().CaptureInfo,
Origin: packetInfo.Source.Origin,
}
diagnose.InternalStats.Totalsz += len(tcp.Payload)
if !dbgctl.MizuTapperDisableTcpReassembly {
a.assemblerMutex.Lock()
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
a.assemblerMutex.Unlock()
}
}
diagnose.InternalStats.Totalsz += len(tcp.Payload)
a.assemblerMutex.Lock()
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
a.assemblerMutex.Unlock()
}
done := *maxcount > 0 && int64(diagnose.AppStats.PacketsCount) >= *maxcount
@@ -132,3 +141,13 @@ func (a *tcpAssembler) waitAndDump() {
logger.Log.Debugf("%s", a.Dump())
a.assemblerMutex.Unlock()
}
func (a *tcpAssembler) shouldIgnorePort(port uint16) bool {
for _, p := range a.ignoredPorts {
if port == p {
return true
}
}
return false
}

View File

@@ -3,12 +3,11 @@ package tap
import (
"bufio"
"io"
"io/ioutil"
"sync"
"time"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/dbgctl"
)
/* TcpReader gets reads from a channel of bytes of tcp payload, and parses it into requests and responses.
@@ -17,50 +16,55 @@ import (
* Implements io.Reader interface (Read)
*/
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte
progress *api.ReadProgress
captureTime time.Time
parent *tcpStream
packetsSeen uint
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload
msgBuffer []api.TcpReaderDataMsg
msgBufferMaster []api.TcpReaderDataMsg
data []byte
progress *api.ReadProgress
captureTime time.Time
parent *tcpStream
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent *tcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) *tcpReader {
func NewTcpReader(ident string, tcpId *api.TcpID, parent *tcpStream, isClient bool, isOutgoing bool, emitter api.Emitter) *tcpReader {
return &tcpReader{
msgQueue: msgQueue,
progress: progress,
ident: ident,
tcpID: tcpId,
captureTime: captureTime,
parent: parent,
isClient: isClient,
isOutgoing: isOutgoing,
extension: extension,
emitter: emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
msgQueue: make(chan api.TcpReaderDataMsg),
progress: &api.ReadProgress{},
ident: ident,
tcpID: tcpId,
parent: parent,
isClient: isClient,
isOutgoing: isOutgoing,
emitter: emitter,
}
}
func (reader *tcpReader) run(options *api.TrafficFilteringOptions, wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(reader)
err := reader.extension.Dissector.Dissect(b, reader, options)
if err != nil {
_, err = io.Copy(ioutil.Discard, reader)
if err != nil {
logger.Log.Errorf("%v", err)
if dbgctl.MizuTapperDisableDissectors {
b := bufio.NewReader(reader)
_, _ = io.ReadAll(b)
return
}
for i, extension := range extensions {
reader.reqResMatcher = reader.parent.reqResMatchers[i]
reader.counterPair = reader.parent.counterPairs[i]
b := bufio.NewReader(reader)
extension.Dissector.Dissect(b, reader, options) //nolint
if reader.isProtocolIdentified() {
break
}
reader.rewind()
}
}
@@ -81,21 +85,60 @@ func (reader *tcpReader) sendMsgIfNotClosed(msg api.TcpReaderDataMsg) {
reader.Unlock()
}
func (reader *tcpReader) isProtocolIdentified() bool {
return reader.parent.protocol != nil
}
func (reader *tcpReader) rewind() {
// Reset the data
reader.data = make([]byte, 0)
// Reset msgBuffer from the master record
reader.parent.Lock()
reader.msgBuffer = make([]api.TcpReaderDataMsg, len(reader.msgBufferMaster))
copy(reader.msgBuffer, reader.msgBufferMaster)
reader.parent.Unlock()
// Reset the read progress
reader.progress.Reset()
}
func (reader *tcpReader) populateData(msg api.TcpReaderDataMsg) {
reader.data = msg.GetBytes()
reader.captureTime = msg.GetTimestamp()
}
func (reader *tcpReader) Read(p []byte) (int, error) {
var msg api.TcpReaderDataMsg
for len(reader.msgBuffer) > 0 && len(reader.data) == 0 {
// Pop first message
if len(reader.msgBuffer) > 1 {
msg, reader.msgBuffer = reader.msgBuffer[0], reader.msgBuffer[1:]
} else {
msg = reader.msgBuffer[0]
reader.msgBuffer = make([]api.TcpReaderDataMsg, 0)
}
// Get the bytes
reader.populateData(msg)
}
ok := true
for ok && len(reader.data) == 0 {
msg, ok = <-reader.msgQueue
if msg != nil {
reader.data = msg.GetBytes()
reader.captureTime = msg.GetTimestamp()
}
reader.populateData(msg)
if len(reader.data) > 0 {
reader.packetsSeen += 1
if !reader.isProtocolIdentified() {
reader.msgBufferMaster = append(
reader.msgBufferMaster,
msg,
)
}
}
}
if !ok || len(reader.data) == 0 {
return 0, io.EOF
}
@@ -142,7 +185,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter {
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -6,7 +6,6 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
)
@@ -16,10 +15,10 @@ type tcpReassemblyStream struct {
fsmerr bool
optchecker reassembly.TCPOptionCheck
isDNS bool
tcpStream api.TcpStream
tcpStream *tcpStream
}
func NewTcpReassemblyStream(ident string, tcp *layers.TCP, fsmOptions reassembly.TCPSimpleFSMOptions, stream api.TcpStream) reassembly.Stream {
func NewTcpReassemblyStream(ident string, tcp *layers.TCP, fsmOptions reassembly.TCPSimpleFSMOptions, stream *tcpStream) reassembly.Stream {
return &tcpReassemblyStream{
ident: ident,
tcpState: reassembly.NewTCPSimpleFSM(fsmOptions),
@@ -139,17 +138,10 @@ func (t *tcpReassemblyStream) ReassembledSG(sg reassembly.ScatterGather, ac reas
// This channel is read by an tcpReader object
diagnose.AppStats.IncReassembledTcpPayloadsCount()
timestamp := ac.GetCaptureInfo().Timestamp
stream := t.tcpStream.(*tcpStream)
if dir == reassembly.TCPDirClientToServer {
for i := range stream.getClients() {
reader := stream.getClient(i)
reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
}
t.tcpStream.client.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
} else {
for i := range stream.getServers() {
reader := stream.getServer(i)
reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
}
t.tcpStream.server.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
}
}
}
@@ -157,7 +149,7 @@ func (t *tcpReassemblyStream) ReassembledSG(sg reassembly.ScatterGather, ac reas
func (t *tcpReassemblyStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
if t.tcpStream.GetIsTapTarget() && !t.tcpStream.GetIsClosed() {
t.tcpStream.(*tcpStream).close()
t.tcpStream.close()
}
// do not remove the connection to allow last ACK
return false

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/dbgctl"
)
/* It's a connection (bidirectional)
@@ -13,25 +14,26 @@ import (
* In our implementation, we pass information from ReassembledSG to the TcpReader through a shared channel.
*/
type tcpStream struct {
id int64
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
clients []*tcpReader
servers []*tcpReader
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
createdAt time.Time
streamsMap api.TcpStreamMap
id int64
isClosed bool
protocol *api.Protocol
isTapTarget bool
client *tcpReader
server *tcpReader
origin api.Capture
counterPairs []*api.CounterPair
reqResMatchers []api.RequestResponseMatcher
createdAt time.Time
streamsMap api.TcpStreamMap
sync.Mutex
}
func NewTcpStream(isTapTarget bool, streamsMap api.TcpStreamMap, capture api.Capture) *tcpStream {
return &tcpStream{
isTapTarget: isTapTarget,
protoIdentifier: &api.ProtoIdentifier{},
streamsMap: streamsMap,
origin: capture,
isTapTarget: isTapTarget,
streamsMap: streamsMap,
origin: capture,
createdAt: time.Now(),
}
}
@@ -55,38 +57,12 @@ func (t *tcpStream) close() {
t.streamsMap.Delete(t.id)
for i := range t.clients {
reader := t.clients[i]
reader.close()
}
for i := range t.servers {
reader := t.servers[i]
reader.close()
}
t.client.close()
t.server.close()
}
func (t *tcpStream) addClient(reader *tcpReader) {
t.clients = append(t.clients, reader)
}
func (t *tcpStream) addServer(reader *tcpReader) {
t.servers = append(t.servers, reader)
}
func (t *tcpStream) getClients() []*tcpReader {
return t.clients
}
func (t *tcpStream) getServers() []*tcpReader {
return t.servers
}
func (t *tcpStream) getClient(index int) *tcpReader {
return t.clients[index]
}
func (t *tcpStream) getServer(index int) *tcpReader {
return t.servers[index]
func (t *tcpStream) addCounterPair(counterPair *api.CounterPair) {
t.counterPairs = append(t.counterPairs, counterPair)
}
func (t *tcpStream) addReqResMatcher(reqResMatcher api.RequestResponseMatcher) {
@@ -94,44 +70,27 @@ func (t *tcpStream) addReqResMatcher(reqResMatcher api.RequestResponseMatcher) {
}
func (t *tcpStream) SetProtocol(protocol *api.Protocol) {
t.protocol = protocol
// Clean the buffers
t.Lock()
defer t.Unlock()
if t.protoIdentifier.IsClosedOthers {
return
}
t.protoIdentifier.Protocol = protocol
for i := range t.clients {
reader := t.clients[i]
if reader.GetExtension().Protocol != t.protoIdentifier.Protocol {
reader.close()
}
}
for i := range t.servers {
reader := t.servers[i]
if reader.GetExtension().Protocol != t.protoIdentifier.Protocol {
reader.close()
}
}
t.protoIdentifier.IsClosedOthers = true
t.client.msgBufferMaster = make([]api.TcpReaderDataMsg, 0)
t.server.msgBufferMaster = make([]api.TcpReaderDataMsg, 0)
t.Unlock()
}
func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
}
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
return t.reqResMatchers
}
func (t *tcpStream) GetIsTapTarget() bool {
if dbgctl.MizuTapperDisableTcpStream {
return false
}
return t.isTapTarget
}

View File

@@ -3,7 +3,6 @@ package tap
import (
"fmt"
"sync"
"time"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
@@ -62,62 +61,50 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcpLayer *lay
reassemblyStream := NewTcpReassemblyStream(fmt.Sprintf("%s:%s", net, transport), tcpLayer, fsmOptions, stream)
if stream.GetIsTapTarget() {
stream.setId(factory.streamsMap.NextId())
for i, extension := range extensions {
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
stream.addReqResMatcher(reqResMatcher)
for _, extension := range extensions {
counterPair := &api.CounterPair{
Request: 0,
Response: 0,
}
stream.addClient(
NewTcpReader(
make(chan api.TcpReaderDataMsg),
&api.ReadProgress{},
fmt.Sprintf("%s %s", net, transport),
&api.TcpID{
SrcIP: srcIp,
DstIP: dstIp,
SrcPort: srcPort,
DstPort: dstPort,
},
time.Time{},
stream,
true,
props.isOutgoing,
extension,
factory.emitter,
counterPair,
reqResMatcher,
),
)
stream.addServer(
NewTcpReader(
make(chan api.TcpReaderDataMsg),
&api.ReadProgress{},
fmt.Sprintf("%s %s", net, transport),
&api.TcpID{
SrcIP: net.Dst().String(),
DstIP: net.Src().String(),
SrcPort: transport.Dst().String(),
DstPort: transport.Src().String(),
},
time.Time{},
stream,
false,
props.isOutgoing,
extension,
factory.emitter,
counterPair,
reqResMatcher,
),
)
stream.addCounterPair(counterPair)
factory.streamsMap.Store(stream.getId(), stream)
factory.wg.Add(2)
go stream.getClient(i).run(filteringOptions, &factory.wg)
go stream.getServer(i).run(filteringOptions, &factory.wg)
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
stream.addReqResMatcher(reqResMatcher)
}
stream.client = NewTcpReader(
fmt.Sprintf("%s %s", net, transport),
&api.TcpID{
SrcIP: srcIp,
DstIP: dstIp,
SrcPort: srcPort,
DstPort: dstPort,
},
stream,
true,
props.isOutgoing,
factory.emitter,
)
stream.server = NewTcpReader(
fmt.Sprintf("%s %s", net, transport),
&api.TcpID{
SrcIP: net.Dst().String(),
DstIP: net.Src().String(),
SrcPort: transport.Dst().String(),
DstPort: transport.Src().String(),
},
stream,
false,
props.isOutgoing,
factory.emitter,
)
factory.streamsMap.Store(stream.getId(), stream)
factory.wg.Add(2)
go stream.client.run(filteringOptions, &factory.wg)
go stream.server.run(filteringOptions, &factory.wg)
}
return reassemblyStream
}

View File

@@ -28,10 +28,12 @@ func (streamMap *tcpStreamMap) Range(f func(key, value interface{}) bool) {
func (streamMap *tcpStreamMap) Store(key, value interface{}) {
streamMap.streams.Store(key, value)
diagnose.AppStats.IncLiveTcpStreams()
}
func (streamMap *tcpStreamMap) Delete(key interface{}) {
streamMap.streams.Delete(key)
diagnose.AppStats.DecLiveTcpStreams()
}
func (streamMap *tcpStreamMap) NextId() int64 {
@@ -57,7 +59,7 @@ func (streamMap *tcpStreamMap) CloseTimedoutTcpStreamChannels() {
return true
}
if stream.protoIdentifier.Protocol == nil {
if stream.protocol == nil {
if !stream.isClosed && time.Now().After(stream.createdAt.Add(tcpStreamChannelTimeoutMs)) {
stream.close()
diagnose.AppStats.IncDroppedTcpStreams()

View File

@@ -188,8 +188,7 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, address *addressPair, key
}
stream := &tlsStream{
reader: reader,
protoIdentifier: &api.ProtoIdentifier{},
reader: reader,
}
streamsMap.Store(streamsMap.NextId(), stream)

View File

@@ -94,7 +94,3 @@ func (r *tlsReader) GetEmitter() api.Emitter {
func (r *tlsReader) GetIsClosed() bool {
return false
}
func (r *tlsReader) GetExtension() *api.Extension {
return r.extension
}

View File

@@ -3,20 +3,16 @@ package tlstapper
import "github.com/up9inc/mizu/tap/api"
type tlsStream struct {
reader *tlsReader
protoIdentifier *api.ProtoIdentifier
reader *tlsReader
protocol *api.Protocol
}
func (t *tlsStream) GetOrigin() api.Capture {
return api.Ebpf
}
func (t *tlsStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
}
func (t *tlsStream) SetProtocol(protocol *api.Protocol) {
t.protoIdentifier.Protocol = protocol
t.protocol = protocol
}
func (t *tlsStream) GetReqResMatchers() []api.RequestResponseMatcher {

View File

@@ -91,4 +91,4 @@
"files": [
"dist"
]
}
}

View File

@@ -1,5 +1,5 @@
import styles from "./EntrySections.module.sass";
import React, { useState } from "react";
import React, { useCallback, useEffect, useMemo, useState } from "react";
import { SyntaxHighlighter } from "../../UI/SyntaxHighlighter/index";
import CollapsibleContainer from "../../UI/CollapsibleContainer";
import FancyTextDisplay from "../../UI/FancyTextDisplay";
@@ -8,6 +8,7 @@ import Checkbox from "../../UI/Checkbox";
import ProtobufDecoder from "protobuf-decoder";
import { default as jsonBeautify } from "json-beautify";
import { default as xmlBeautify } from "xml-formatter";
import { Utils } from "../../../helpers/Utils"
interface EntryViewLineProps {
label: string;
@@ -101,6 +102,12 @@ export const EntrySectionContainer: React.FC<EntrySectionContainerProps> = ({ ti
</CollapsibleContainer>
}
const MAXIMUM_BYTES_TO_FORMAT = 1000000; // The maximum of chars to highlight in body, in case the response can be megabytes
const jsonLikeFormats = ['json', 'yaml', 'yml'];
const xmlLikeFormats = ['xml', 'html'];
const protobufFormats = ['application/grpc'];
const supportedFormats = jsonLikeFormats.concat(xmlLikeFormats, protobufFormats);
interface EntryBodySectionProps {
title: string,
content: any,
@@ -118,21 +125,21 @@ export const EntryBodySection: React.FC<EntryBodySectionProps> = ({
contentType,
selector,
}) => {
const MAXIMUM_BYTES_TO_FORMAT = 1000000; // The maximum of chars to highlight in body, in case the response can be megabytes
const jsonLikeFormats = ['json', 'yaml', 'yml'];
const xmlLikeFormats = ['xml', 'html'];
const protobufFormats = ['application/grpc'];
const supportedFormats = jsonLikeFormats.concat(xmlLikeFormats, protobufFormats);
const [isPretty, setIsPretty] = useState(true);
const [showLineNumbers, setShowLineNumbers] = useState(true);
const [showLineNumbers, setShowLineNumbers] = useState(false);
const [decodeBase64, setDecodeBase64] = useState(true);
const isBase64Encoding = encoding === 'base64';
const supportsPrettying = supportedFormats.some(format => contentType?.indexOf(format) > -1);
const [isDecodeGrpc, setIsDecodeGrpc] = useState(true);
const [isLineNumbersGreaterThenOne, setIsLineNumbersGreaterThenOne] = useState(true);
const formatTextBody = (body: any): string => {
useEffect(() => {
(isLineNumbersGreaterThenOne && isPretty) && setShowLineNumbers(true);
!isLineNumbersGreaterThenOne && setShowLineNumbers(false);
}, [isLineNumbersGreaterThenOne, isPretty])
const formatTextBody = useCallback((body: any): string => {
if (!decodeBase64) return body;
const chunk = body.slice(0, MAXIMUM_BYTES_TO_FORMAT);
@@ -158,15 +165,24 @@ export const EntryBodySection: React.FC<EntryBodySectionProps> = ({
return jsonBeautify(protobufDecoded, null, 2, 80);
}
} catch (error) {
if (String(error).includes("More than one message in")){
if(isDecodeGrpc)
setIsDecodeGrpc(false);
if (String(error).includes("More than one message in")) {
if (isDecodeGrpc)
setIsDecodeGrpc(false);
} else if (String(error).includes("Failed to parse")) {
console.warn(error);
} else {
console.error(error);
}
}
return bodyBuf;
}
}, [isPretty, contentType, isDecodeGrpc, decodeBase64, isBase64Encoding])
const formattedText = useMemo(() => formatTextBody(content), [formatTextBody, content]);
useEffect(() => {
const lineNumbers = Utils.lineNumbersInString(formattedText);
setIsLineNumbersGreaterThenOne(lineNumbers > 1);
}, [isPretty, content, showLineNumbers, formattedText]);
return <React.Fragment>
{content && content?.length > 0 && <EntrySectionContainer
@@ -181,18 +197,19 @@ export const EntryBodySection: React.FC<EntryBodySectionProps> = ({
{supportsPrettying && <span style={{ marginLeft: '.2rem' }}>Pretty</span>}
<div style={{ paddingTop: 3, paddingLeft: supportsPrettying ? 20 : 0 }}>
<Checkbox checked={showLineNumbers} onToggle={() => { setShowLineNumbers(!showLineNumbers) }} />
<Checkbox checked={showLineNumbers} onToggle={() => { setShowLineNumbers(!showLineNumbers) }} disabled={!isLineNumbersGreaterThenOne || !decodeBase64} />
</div>
<span style={{ marginLeft: '.2rem' }}>Line numbers</span>
{isBase64Encoding && <div style={{ paddingTop: 3, paddingLeft: 20 }}>
<span style={{ marginLeft: '.2rem' }}>Line numbers</span>
{isBase64Encoding && <div style={{ paddingTop: 3, paddingLeft: (isLineNumbersGreaterThenOne || supportsPrettying) ? 20 : 0 }}>
<Checkbox checked={decodeBase64} onToggle={() => { setDecodeBase64(!decodeBase64) }} />
</div>}
{isBase64Encoding && <span style={{ marginLeft: '.2rem' }}>Decode Base64</span>}
{!isDecodeGrpc && <span style={{ fontSize: '12px', color: '#DB2156', marginLeft: '.8rem' }}>More than one message in protobuf payload is not supported</span>}
</div>
</div>
<SyntaxHighlighter
code={formatTextBody(content)}
<SyntaxHighlighter
code={formattedText}
showLineNumbers={showLineNumbers}
/>

View File

@@ -296,7 +296,7 @@ export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode, name
query={`dst.ip == "${entry.dst.ip}"`}
displayIconOnMouseOver={true}
flipped={false}
iconStyle={{marginTop: "28px"}}
iconStyle={{marginTop: "30px", marginLeft: "-2px",right: "35px", position: "relative"}}
>
<span
className={`${styles.tcpInfo} ${styles.ip}`}

View File

@@ -54,7 +54,7 @@ const Queryable: React.FC<Props> = ({query, style, iconStyle, className, useTool
{flipped && addButton}
{children}
{!flipped && addButton}
{useTooltip && showTooltip && <span data-cy={"QueryableTooltip"} className={QueryableStyle.QueryableTooltip} style={tooltipStyle}>{query}</span>}
{useTooltip && showTooltip && (query !== "") && <span data-cy={"QueryableTooltip"} className={QueryableStyle.QueryableTooltip} style={tooltipStyle}>{query}</span>}
</div>
);
};

View File

@@ -22,8 +22,8 @@ export const StatusBar: React.FC<StatusBarProps> = ({isDemoBannerView, disabled}
const {uniqueNamespaces, amountOfPods, amountOfTappedPods, amountOfUntappedPods} = useRecoilValue(tappingStatusDetails);
return <div style={{opacity: disabled ? 0.4 : 1}} className={`${isDemoBannerView ? `${style.banner}` : ''} ${style.statusBar} ${(expandedBar && !disabled ? `${style.expandedStatusBar}` : "")}`} onMouseOver={() => setExpandedBar(true)} onMouseLeave={() => setExpandedBar(false)} data-cy="expandedStatusBar">
<div className={style.podsCount}>
{tappingStatus.some(pod => !pod.isTapped) && <img src={warningIcon} alt="warning"/>}
{disabled && <Tooltip title={"Tapping status is not updated when streaming is paused"} isSimple><img src={warningIcon} alt="warning"/></Tooltip>}
{!disabled && tappingStatus.some(pod => !pod.isTapped) && <img src={warningIcon} alt="warning"/>}
{disabled && <Tooltip title={"Tapping status is not updated when streaming is paused"} isSimple><img src={warningIcon} alt="warning"/></Tooltip>}
<span className={style.podsCountText} data-cy="podsCountText">
{`Tapping ${amountOfUntappedPods > 0 ? amountOfTappedPods + " / " + amountOfPods : amountOfPods} ${pluralize('pod', amountOfPods)} in ${pluralize('namespace', uniqueNamespaces.length)} ${uniqueNamespaces.join(", ")}`}
</span>

View File

@@ -1,4 +1,4 @@
import React from 'react';
import React, { useEffect, useState } from 'react';
import Lowlight from 'react-lowlight'
import 'highlight.js/styles/atom-one-light.css'
import styles from './index.module.sass';
@@ -30,18 +30,23 @@ interface Props {
}
export const SyntaxHighlighter: React.FC<Props> = ({
code,
showLineNumbers = false,
language = null
}) => {
const markers = showLineNumbers ? code.split("\n").map((item, i) => {
return {
line: i + 1,
className: styles.hljsMarkerLine
}
}) : [];
code,
showLineNumbers = false,
language = null,
}) => {
const [markers, setMarkers] = useState([])
return <div style={{fontSize: ".75rem"}} className={styles.highlighterContainer}><Lowlight language={language ? language : ""} value={code} markers={markers}/></div>;
useEffect(() => {
const newMarkers = code.split("\n").map((item, i) => {
return {
line: i + 1,
className: styles.hljsMarkerLine
}
});
setMarkers(showLineNumbers ? newMarkers : []);
}, [showLineNumbers, code])
return <div style={{ fontSize: ".75rem" }} className={styles.highlighterContainer}><Lowlight language={language ? language : ""} value={code} markers={markers} /></div>;
};
export default SyntaxHighlighter;

View File

@@ -50,10 +50,9 @@
td
color: $light-gray
padding: 10px
font-size: 11px
font-size: 15px
font-weight: 600
padding-top: 5px
padding-bottom: 5px
padding: 10px
.nowrap
white-space: nowrap

View File

@@ -3,4 +3,5 @@ const IP_ADDRESS_REGEX = /([0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3})(:([0-9]{
export class Utils {
static isIpAddress = (address: string): boolean => IP_ADDRESS_REGEX.test(address)
}
static lineNumbersInString = (code:string): number => code.split("\n").length;
}