mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-19 04:20:01 +00:00
Compare commits
11 Commits
29.0-dev16
...
29.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af61c69fb6 | ||
|
|
1cbd9cb199 | ||
|
|
23c1b66855 | ||
|
|
f5fa9ff270 | ||
|
|
4159938cea | ||
|
|
5614e153f3 | ||
|
|
5e90d67b0e | ||
|
|
dd430c31d5 | ||
|
|
94dfa68858 | ||
|
|
50c0062db4 | ||
|
|
720969bbe6 |
35
.github/workflows/build.yml
vendored
35
.github/workflows/build.yml
vendored
@@ -15,15 +15,23 @@ jobs:
|
||||
name: CLI executable build
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 2
|
||||
|
||||
- name: Check modified files
|
||||
id: modified_files
|
||||
run: devops/check_modified_files.sh cli/
|
||||
|
||||
- name: Set up Go 1.17
|
||||
if: steps.modified_files.outputs.matched == 'true'
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '1.17'
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Build CLI
|
||||
if: steps.modified_files.outputs.matched == 'true'
|
||||
run: make cli
|
||||
|
||||
build-agent:
|
||||
@@ -32,6 +40,23 @@ jobs:
|
||||
steps:
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 2
|
||||
|
||||
- name: Check modified files
|
||||
id: modified_files
|
||||
run: devops/check_modified_files.sh agent/ shared/ tap/ ui/ Dockerfile
|
||||
|
||||
- name: Build Agent
|
||||
run: make agent-docker
|
||||
- name: Set up Docker Buildx
|
||||
if: steps.modified_files.outputs.matched == 'true'
|
||||
uses: docker/setup-buildx-action@v1
|
||||
|
||||
- name: Build
|
||||
uses: docker/build-push-action@v2
|
||||
if: steps.modified_files.outputs.matched == 'true'
|
||||
with:
|
||||
context: .
|
||||
push: false
|
||||
tags: up9inc/mizu:devlatest
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
||||
|
||||
4
.github/workflows/release.yml
vendored
4
.github/workflows/release.yml
vendored
@@ -59,6 +59,7 @@ jobs:
|
||||
tags: |
|
||||
type=raw,${{ steps.versioning.outputs.version }}
|
||||
type=raw,value=latest,enable=${{ steps.condval.outputs.value == 'stable' }}
|
||||
type=raw,value=dev-latest,enable=${{ steps.condval.outputs.value == 'dev' }}
|
||||
flavor: |
|
||||
latest=auto
|
||||
prefix=
|
||||
@@ -145,6 +146,7 @@ jobs:
|
||||
tags: |
|
||||
type=raw,${{ steps.versioning.outputs.version }}
|
||||
type=raw,value=latest,enable=${{ steps.condval.outputs.value == 'stable' }}
|
||||
type=raw,value=dev-latest,enable=${{ steps.condval.outputs.value == 'dev' }}
|
||||
flavor: |
|
||||
latest=auto
|
||||
prefix=
|
||||
@@ -208,7 +210,7 @@ jobs:
|
||||
tags: |
|
||||
type=raw,${{ steps.versioning.outputs.version }}
|
||||
type=raw,value=latest,enable=${{ steps.condval.outputs.value == 'stable' }}
|
||||
|
||||
type=raw,value=dev-latest,enable=${{ steps.condval.outputs.value == 'dev' }}
|
||||
- name: Login to Docker Hub
|
||||
uses: docker/login-action@v1
|
||||
with:
|
||||
|
||||
23
.github/workflows/test.yml
vendored
23
.github/workflows/test.yml
vendored
@@ -19,14 +19,16 @@ jobs:
|
||||
name: Unit Tests
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 2
|
||||
|
||||
- name: Set up Go 1.17
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.17'
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Install libpcap
|
||||
shell: bash
|
||||
run: |
|
||||
@@ -40,16 +42,31 @@ jobs:
|
||||
- name: 'Set up Cloud SDK'
|
||||
uses: 'google-github-actions/setup-gcloud@v0'
|
||||
|
||||
- name: Check CLI modified files
|
||||
id: cli_modified_files
|
||||
run: devops/check_modified_files.sh cli/
|
||||
|
||||
- name: CLI Test
|
||||
if: github.event_name == 'push' || steps.cli_modified_files.outputs.matched == 'true'
|
||||
run: make test-cli
|
||||
|
||||
- name: Check Agent modified files
|
||||
id: agent_modified_files
|
||||
run: devops/check_modified_files.sh agent/
|
||||
|
||||
- name: Agent Test
|
||||
if: github.event_name == 'push' || steps.agent_modified_files.outputs.matched == 'true'
|
||||
run: make test-agent
|
||||
|
||||
- name: Shared Test
|
||||
run: make test-shared
|
||||
|
||||
- name: Check extensions modified files
|
||||
id: ext_modified_files
|
||||
run: devops/check_modified_files.sh tap/extensions/
|
||||
|
||||
- name: Extensions Test
|
||||
if: github.event_name == 'push' || steps.ext_modified_files.outputs.matched == 'true'
|
||||
run: make test-extensions
|
||||
|
||||
- name: Upload coverage to Codecov
|
||||
|
||||
@@ -65,14 +65,14 @@ export function checkThatAllEntriesShown() {
|
||||
}
|
||||
|
||||
export function checkFilterByMethod(funcDict) {
|
||||
const {protocol, method, summary, hugeMizu} = funcDict;
|
||||
const summaryDict = getSummeryDict(summary);
|
||||
const methodDict = getMethodDict(method);
|
||||
const {protocol, method, methodQuery, summary, summaryQuery} = funcDict;
|
||||
const summaryDict = getSummaryDict(summary, summaryQuery);
|
||||
const methodDict = getMethodDict(method, methodQuery);
|
||||
const protocolDict = getProtocolDict(protocol.name, protocol.text);
|
||||
|
||||
it(`Testing the method: ${method}`, function () {
|
||||
// applying filter
|
||||
cy.get('.w-tc-editor-text').clear().type(`method == "${method}"`);
|
||||
cy.get('.w-tc-editor-text').clear().type(methodQuery);
|
||||
cy.get('[type="submit"]').click();
|
||||
cy.get('.w-tc-editor').should('have.attr', 'style').and('include', Cypress.env('greenFilterColor'));
|
||||
|
||||
@@ -121,7 +121,7 @@ function resizeIfNeeded(entriesLen) {
|
||||
function deepCheck(generalDict, protocolDict, methodDict, entry) {
|
||||
const entryNum = getEntryNumById(entry.id);
|
||||
const {summary, value} = generalDict;
|
||||
const summaryDict = getSummeryDict(summary);
|
||||
const summaryDict = getSummaryDict(summary);
|
||||
|
||||
leftOnHoverCheck(entryNum, methodDict.pathLeft, methodDict.expectedOnHover);
|
||||
leftOnHoverCheck(entryNum, protocolDict.pathLeft, protocolDict.expectedOnHover);
|
||||
@@ -149,13 +149,13 @@ function deepCheck(generalDict, protocolDict, methodDict, entry) {
|
||||
}
|
||||
}
|
||||
|
||||
function getSummeryDict(summary) {
|
||||
if (summary) {
|
||||
function getSummaryDict(value, query) {
|
||||
if (value) {
|
||||
return {
|
||||
pathLeft: '> :nth-child(2) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
|
||||
pathRight: '> :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
|
||||
expectedText: summary,
|
||||
expectedOnHover: `summary == "${summary}"`
|
||||
expectedText: value,
|
||||
expectedOnHover: query
|
||||
};
|
||||
}
|
||||
else {
|
||||
@@ -163,12 +163,12 @@ function getSummeryDict(summary) {
|
||||
}
|
||||
}
|
||||
|
||||
function getMethodDict(method) {
|
||||
function getMethodDict(value, query) {
|
||||
return {
|
||||
pathLeft: '> :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
|
||||
pathRight: '> :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
|
||||
expectedText: method,
|
||||
expectedOnHover: `method == "${method}"`
|
||||
expectedText: value,
|
||||
expectedOnHover: query
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -9,41 +9,53 @@ const rabbitProtocolDetails = {name: 'AMQP', text: 'Advanced Message Queuing Pro
|
||||
checkFilterByMethod({
|
||||
protocol: rabbitProtocolDetails,
|
||||
method: 'exchange declare',
|
||||
methodQuery: 'request.method == "exchange declare"',
|
||||
summary: 'exchange',
|
||||
summaryQuery: 'request.exchange == "exchange"',
|
||||
value: null
|
||||
});
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: rabbitProtocolDetails,
|
||||
method: 'queue declare',
|
||||
methodQuery: 'request.method == "queue declare"',
|
||||
summary: 'queue',
|
||||
summaryQuery: 'request.queue == "queue"',
|
||||
value: null
|
||||
});
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: rabbitProtocolDetails,
|
||||
method: 'queue bind',
|
||||
methodQuery: 'request.method == "queue bind"',
|
||||
summary: 'queue',
|
||||
summaryQuery: 'request.queue == "queue"',
|
||||
value: null
|
||||
});
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: rabbitProtocolDetails,
|
||||
method: 'basic publish',
|
||||
methodQuery: 'request.method == "basic publish"',
|
||||
summary: 'exchange',
|
||||
summaryQuery: 'request.exchange == "exchange"',
|
||||
value: {tab: valueTabs.request, regex: /^message$/mg}
|
||||
});
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: rabbitProtocolDetails,
|
||||
method: 'basic consume',
|
||||
methodQuery: 'request.method == "basic consume"',
|
||||
summary: 'queue',
|
||||
summaryQuery: 'request.queue == "queue"',
|
||||
value: null
|
||||
});
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: rabbitProtocolDetails,
|
||||
method: 'basic deliver',
|
||||
methodQuery: 'request.method == "basic deliver"',
|
||||
summary: 'exchange',
|
||||
summaryQuery: 'request.queue == "exchange"',
|
||||
value: {tab: valueTabs.request, regex: /^message$/mg}
|
||||
});
|
||||
|
||||
@@ -9,34 +9,44 @@ const redisProtocolDetails = {name: 'redis', text: 'Redis Serialization Protocol
|
||||
checkFilterByMethod({
|
||||
protocol: redisProtocolDetails,
|
||||
method: 'PING',
|
||||
methodQuery: 'request.command == "PING"',
|
||||
summary: null,
|
||||
summaryQuery: '',
|
||||
value: null
|
||||
})
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: redisProtocolDetails,
|
||||
method: 'SET',
|
||||
methodQuery: 'request.command == "SET"',
|
||||
summary: 'key',
|
||||
summaryQuery: 'request.key == "key"',
|
||||
value: {tab: valueTabs.request, regex: /^\[value, keepttl]$/mg}
|
||||
})
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: redisProtocolDetails,
|
||||
method: 'EXISTS',
|
||||
methodQuery: 'request.command == "EXISTS"',
|
||||
summary: 'key',
|
||||
summaryQuery: 'request.key == "key"',
|
||||
value: {tab: valueTabs.response, regex: /^1$/mg}
|
||||
})
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: redisProtocolDetails,
|
||||
method: 'GET',
|
||||
methodQuery: 'request.command == "GET"',
|
||||
summary: 'key',
|
||||
summaryQuery: 'request.key == "key"',
|
||||
value: {tab: valueTabs.response, regex: /^value$/mg}
|
||||
})
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: redisProtocolDetails,
|
||||
method: 'DEL',
|
||||
methodQuery: 'request.command == "DEL"',
|
||||
summary: 'key',
|
||||
summaryQuery: 'request.key == "key"',
|
||||
value: {tab: valueTabs.response, regex: /^1$|^0$/mg}
|
||||
})
|
||||
|
||||
@@ -113,7 +113,7 @@ if (Cypress.env('shouldCheckSrcAndDest')) {
|
||||
}
|
||||
|
||||
checkFilter({
|
||||
name: 'method == "GET"',
|
||||
name: 'request.method == "GET"',
|
||||
leftSidePath: '> :nth-child(3) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
|
||||
leftSideExpectedText: 'GET',
|
||||
rightSidePath: '> :nth-child(2) > :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
|
||||
@@ -122,7 +122,7 @@ checkFilter({
|
||||
});
|
||||
|
||||
checkFilter({
|
||||
name: 'summary == "/get"',
|
||||
name: 'request.path == "/get"',
|
||||
leftSidePath: '> :nth-child(3) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
|
||||
leftSideExpectedText: '/get',
|
||||
rightSidePath: '> :nth-child(2) > :nth-child(2) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
|
||||
@@ -139,7 +139,7 @@ checkFilter({
|
||||
applyByEnter: false
|
||||
});
|
||||
|
||||
checkFilterNoResults('method == "POST"');
|
||||
checkFilterNoResults('request.method == "POST"');
|
||||
|
||||
function checkFilterNoResults(filterName) {
|
||||
it(`checking the filter: ${filterName}. Expecting no results`, function () {
|
||||
|
||||
@@ -17,6 +17,12 @@ import (
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var extensionsMap map[string]*tapApi.Extension // global
|
||||
|
||||
func InitExtensionsMap(ref map[string]*tapApi.Extension) {
|
||||
extensionsMap = ref
|
||||
}
|
||||
|
||||
type EventHandlers interface {
|
||||
WebSocketConnect(socketId int, isTapper bool)
|
||||
WebSocketDisconnect(socketId int, isTapper bool)
|
||||
@@ -165,7 +171,8 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
if params.EnableFullEntries {
|
||||
message, _ = models.CreateFullEntryWebSocketMessage(entry)
|
||||
} else {
|
||||
base := tapApi.Summarize(entry)
|
||||
extension := extensionsMap[entry.Protocol.Name]
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
message, _ = models.CreateBaseEntryWebSocketMessage(base)
|
||||
}
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ func LoadExtensions() {
|
||||
})
|
||||
|
||||
controllers.InitExtensionsMap(ExtensionsMap)
|
||||
api.InitExtensionsMap(ExtensionsMap)
|
||||
}
|
||||
|
||||
func ConfigureBasenineServer(host string, port string, dbSize int64, logLevel logging.Level, insertionFilter string) {
|
||||
|
||||
@@ -77,7 +77,8 @@ func GetEntries(c *gin.Context) {
|
||||
return // exit
|
||||
}
|
||||
|
||||
base := tapApi.Summarize(entry)
|
||||
extension := extensionsMap[entry.Protocol.Name]
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
|
||||
dataSlice = append(dataSlice, base)
|
||||
}
|
||||
@@ -123,6 +124,7 @@ func GetEntry(c *gin.Context) {
|
||||
}
|
||||
|
||||
extension := extensionsMap[entry.Protocol.Name]
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
representation, bodySize, _ := extension.Dissector.Represent(entry.Request, entry.Response)
|
||||
|
||||
var rules []map[string]interface{}
|
||||
@@ -142,6 +144,7 @@ func GetEntry(c *gin.Context) {
|
||||
Representation: string(representation),
|
||||
BodySize: bodySize,
|
||||
Data: entry,
|
||||
Base: base,
|
||||
Rules: rules,
|
||||
IsRulesEnabled: isRulesEnabled,
|
||||
})
|
||||
|
||||
@@ -80,11 +80,7 @@ type httpEntry struct {
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
Request map[string]interface{} `json:"request"`
|
||||
Response map[string]interface{} `json:"response"`
|
||||
Summary string `json:"summary"`
|
||||
Method string `json:"method"`
|
||||
Status int `json:"status"`
|
||||
ElapsedTime int64 `json:"elapsedTime"`
|
||||
Path string `json:"path"`
|
||||
}
|
||||
|
||||
func (client *client) PushEntry(entry *api.Entry) {
|
||||
@@ -103,11 +99,7 @@ func (client *client) PushEntry(entry *api.Entry) {
|
||||
CreatedAt: entry.StartTime,
|
||||
Request: entry.Request,
|
||||
Response: entry.Response,
|
||||
Summary: entry.Summary,
|
||||
Method: entry.Method,
|
||||
Status: entry.Status,
|
||||
ElapsedTime: entry.ElapsedTime,
|
||||
Path: entry.Path,
|
||||
}
|
||||
|
||||
entryJson, err := json.Marshal(entryToPush)
|
||||
|
||||
@@ -12,10 +12,6 @@ import (
|
||||
"github.com/up9inc/mizu/tap"
|
||||
)
|
||||
|
||||
func GetEntry(r *tapApi.Entry, v tapApi.DataUnmarshaler) error {
|
||||
return v.UnmarshalData(r)
|
||||
}
|
||||
|
||||
type TapConfig struct {
|
||||
TappedNamespaces map[string]bool `json:"tappedNamespaces"`
|
||||
}
|
||||
|
||||
@@ -4,11 +4,10 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"github.com/up9inc/mizu/cli/utils"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/shared/kubernetes"
|
||||
@@ -59,7 +58,7 @@ func (provider *Provider) TestConnection() error {
|
||||
|
||||
func (provider *Provider) isReachable() (bool, error) {
|
||||
echoUrl := fmt.Sprintf("%s/echo", provider.url)
|
||||
if _, err := provider.get(echoUrl); err != nil {
|
||||
if _, err := utils.Get(echoUrl, provider.client); err != nil {
|
||||
return false, err
|
||||
} else {
|
||||
return true, nil
|
||||
@@ -72,7 +71,7 @@ func (provider *Provider) ReportTapperStatus(tapperStatus shared.TapperStatus) e
|
||||
if jsonValue, err := json.Marshal(tapperStatus); err != nil {
|
||||
return fmt.Errorf("failed Marshal the tapper status %w", err)
|
||||
} else {
|
||||
if _, err := provider.post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
if _, err := utils.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue), provider.client); err != nil {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
|
||||
} else {
|
||||
logger.Log.Debugf("Reported to server API about tapper status: %v", tapperStatus)
|
||||
@@ -89,7 +88,7 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
|
||||
if jsonValue, err := json.Marshal(podInfos); err != nil {
|
||||
return fmt.Errorf("failed Marshal the tapped pods %w", err)
|
||||
} else {
|
||||
if _, err := provider.post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
if _, err := utils.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue), provider.client); err != nil {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
|
||||
} else {
|
||||
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
|
||||
@@ -101,7 +100,7 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
|
||||
func (provider *Provider) GetGeneralStats() (map[string]interface{}, error) {
|
||||
generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url)
|
||||
|
||||
response, requestErr := provider.get(generalStatsUrl)
|
||||
response, requestErr := utils.Get(generalStatsUrl, provider.client)
|
||||
if requestErr != nil {
|
||||
return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr)
|
||||
}
|
||||
@@ -126,7 +125,7 @@ func (provider *Provider) GetVersion() (string, error) {
|
||||
Method: http.MethodGet,
|
||||
URL: versionUrl,
|
||||
}
|
||||
statusResp, err := provider.do(req)
|
||||
statusResp, err := utils.Do(req, provider.client)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -139,40 +138,3 @@ func (provider *Provider) GetVersion() (string, error) {
|
||||
|
||||
return versionResponse.Ver, nil
|
||||
}
|
||||
|
||||
// When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func (provider *Provider) get(url string) (*http.Response, error) {
|
||||
return provider.checkError(provider.client.Get(url))
|
||||
}
|
||||
|
||||
// When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func (provider *Provider) post(url, contentType string, body io.Reader) (*http.Response, error) {
|
||||
return provider.checkError(provider.client.Post(url, contentType, body))
|
||||
}
|
||||
|
||||
// When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func (provider *Provider) do(req *http.Request) (*http.Response, error) {
|
||||
return provider.checkError(provider.client.Do(req))
|
||||
}
|
||||
|
||||
func (provider *Provider) checkError(response *http.Response, errInOperation error) (*http.Response, error) {
|
||||
if (errInOperation != nil) {
|
||||
return response, errInOperation
|
||||
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
|
||||
} else if response.StatusCode != http.StatusOK {
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
response.Body.Close()
|
||||
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||
if err != nil {
|
||||
return response, err
|
||||
}
|
||||
|
||||
errorMsg := strings.ReplaceAll((string(body)), "\n", ";")
|
||||
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
42
cli/bucket/provider.go
Normal file
42
cli/bucket/provider.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package bucket
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/utils"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Provider struct {
|
||||
url string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
const DefaultTimeout = 2 * time.Second
|
||||
|
||||
func NewProvider(url string, timeout time.Duration) *Provider {
|
||||
return &Provider{
|
||||
url: url,
|
||||
client: &http.Client{
|
||||
Timeout: timeout,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *Provider) GetInstallTemplate(templateName string) (string, error) {
|
||||
url := fmt.Sprintf("%s/%v", provider.url, templateName)
|
||||
response, err := utils.Get(url, provider.client)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
defer response.Body.Close()
|
||||
|
||||
installTemplate, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(installTemplate), nil
|
||||
}
|
||||
@@ -36,8 +36,8 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
|
||||
return
|
||||
}
|
||||
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := apiProvider.TestConnection(); err != nil {
|
||||
provider := apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := provider.TestConnection(); err != nil {
|
||||
logger.Log.Debugf("Couldn't connect using proxy, stopping proxy and trying to create port-forward")
|
||||
if err := httpServer.Shutdown(ctx); err != nil {
|
||||
logger.Log.Debugf("Error occurred while stopping proxy %v", errormessage.FormatError(err))
|
||||
@@ -51,8 +51,8 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
|
||||
return
|
||||
}
|
||||
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := apiProvider.TestConnection(); err != nil {
|
||||
provider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := provider.TestConnection(); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
||||
cancel()
|
||||
return
|
||||
|
||||
@@ -3,7 +3,6 @@ package cmd
|
||||
import (
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/up9inc/mizu/cli/telemetry"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
var installCmd = &cobra.Command{
|
||||
@@ -11,14 +10,7 @@ var installCmd = &cobra.Command{
|
||||
Short: "Installs mizu components",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
go telemetry.ReportRun("install", nil)
|
||||
logger.Log.Infof("This command has been deprecated, please use helm as described below.\n\n")
|
||||
|
||||
logger.Log.Infof("To install stable build of Mizu on your cluster using helm, run the following command:")
|
||||
logger.Log.Infof(" helm install mizu up9mizu --repo https://static.up9.com/mizu/helm --namespace=mizu --create-namespace\n\n")
|
||||
|
||||
logger.Log.Infof("To install development build of Mizu on your cluster using helm, run the following command:")
|
||||
logger.Log.Infof(" helm install mizu up9mizu --repo https://static.up9.com/mizu/helm-develop --namespace=mizu --create-namespace\n")
|
||||
|
||||
runMizuInstall()
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
19
cli/cmd/installRunner.go
Normal file
19
cli/cmd/installRunner.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/bucket"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
func runMizuInstall() {
|
||||
bucketProvider := bucket.NewProvider(config.Config.Install.TemplateUrl, bucket.DefaultTimeout)
|
||||
installTemplate, err := bucketProvider.GetInstallTemplate(config.Config.Install.TemplateName)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Failed getting install template, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Print(installTemplate)
|
||||
}
|
||||
@@ -23,6 +23,7 @@ const (
|
||||
type ConfigStruct struct {
|
||||
Tap configStructs.TapConfig `yaml:"tap"`
|
||||
Check configStructs.CheckConfig `yaml:"check"`
|
||||
Install configStructs.InstallConfig `yaml:"install"`
|
||||
Version configStructs.VersionConfig `yaml:"version"`
|
||||
View configStructs.ViewConfig `yaml:"view"`
|
||||
Logs configStructs.LogsConfig `yaml:"logs"`
|
||||
|
||||
6
cli/config/configStructs/installConfig.go
Normal file
6
cli/config/configStructs/installConfig.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package configStructs
|
||||
|
||||
type InstallConfig struct {
|
||||
TemplateUrl string `yaml:"template-url" default:"https://storage.googleapis.com/static.up9.io/mizu/helm-template"`
|
||||
TemplateName string `yaml:"template-name" default:"helm-template.yaml"`
|
||||
}
|
||||
47
cli/utils/httpUtils.go
Normal file
47
cli/utils/httpUtils.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Get - When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func Get(url string, client *http.Client) (*http.Response, error) {
|
||||
return checkError(client.Get(url))
|
||||
}
|
||||
|
||||
// Post - When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func Post(url, contentType string, body io.Reader, client *http.Client) (*http.Response, error) {
|
||||
return checkError(client.Post(url, contentType, body))
|
||||
}
|
||||
|
||||
// Do - When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func Do(req *http.Request, client *http.Client) (*http.Response, error) {
|
||||
return checkError(client.Do(req))
|
||||
}
|
||||
|
||||
func checkError(response *http.Response, errInOperation error) (*http.Response, error) {
|
||||
if errInOperation != nil {
|
||||
return response, errInOperation
|
||||
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
|
||||
} else if response.StatusCode != http.StatusOK {
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
response.Body.Close()
|
||||
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||
if err != nil {
|
||||
return response, err
|
||||
}
|
||||
|
||||
errorMsg := strings.ReplaceAll(string(body), "\n", ";")
|
||||
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
45
devops/check_modified_files.sh
Executable file
45
devops/check_modified_files.sh
Executable file
@@ -0,0 +1,45 @@
|
||||
#!/bin/bash
|
||||
paths_arr=( "$@" )
|
||||
|
||||
printf "\n========== List modified files ==========\n"
|
||||
echo "$(git diff --name-only HEAD^ HEAD)"
|
||||
|
||||
printf "\n========== List paths to match and check existence ==========\n"
|
||||
for path in ${paths_arr[*]}
|
||||
do
|
||||
if [ -f "$path" ] || [ -d "$path" ]; then
|
||||
echo "$path - found"
|
||||
else
|
||||
echo "$path - does not found - exiting with failure"
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
printf "\n========== Check paths of modified files ==========\n"
|
||||
git diff --name-only HEAD^ HEAD > files.txt
|
||||
matched=false
|
||||
while IFS= read -r file
|
||||
do
|
||||
for path in ${paths_arr[*]}
|
||||
do
|
||||
if [[ $file == $path* ]]; then
|
||||
echo "$file - match path: $path"
|
||||
matched=true
|
||||
break
|
||||
fi
|
||||
done
|
||||
if [[ $matched == true ]]; then
|
||||
break
|
||||
else
|
||||
echo "$file - does not match any given path"
|
||||
fi
|
||||
done < files.txt
|
||||
|
||||
printf "\n========== Result ==========\n"
|
||||
if [[ $matched = true ]]; then
|
||||
echo "match found"
|
||||
echo "::set-output name=matched::true"
|
||||
else
|
||||
echo "no match found"
|
||||
echo "::set-output name=matched::false"
|
||||
fi
|
||||
@@ -325,15 +325,22 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
||||
tapperSyncer.config.MizuApiFilteringOptions,
|
||||
tapperSyncer.config.LogLevel,
|
||||
tapperSyncer.config.ServiceMesh,
|
||||
tapperSyncer.config.Tls,
|
||||
); err != nil {
|
||||
tapperSyncer.config.Tls); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Successfully created %v tappers", len(tapperSyncer.nodeToTappedPodMap))
|
||||
} else {
|
||||
if err := tapperSyncer.kubernetesProvider.RemoveDaemonSet(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, TapperDaemonSetName); err != nil {
|
||||
if err := tapperSyncer.kubernetesProvider.ResetMizuTapperDaemonSet(
|
||||
tapperSyncer.context,
|
||||
tapperSyncer.config.MizuResourcesNamespace,
|
||||
TapperDaemonSetName,
|
||||
tapperSyncer.config.AgentImage,
|
||||
TapperPodName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Successfully reset tapper daemon set")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -449,9 +449,9 @@ func (provider *Provider) CanI(ctx context.Context, namespace string, resource s
|
||||
Spec: auth.SelfSubjectAccessReviewSpec{
|
||||
ResourceAttributes: &auth.ResourceAttributes{
|
||||
Namespace: namespace,
|
||||
Resource: resource,
|
||||
Verb: verb,
|
||||
Group: group,
|
||||
Resource: resource,
|
||||
Verb: verb,
|
||||
Group: group,
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -995,6 +995,55 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) ResetMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string) error {
|
||||
agentContainer := applyconfcore.Container()
|
||||
agentContainer.WithName(tapperPodName)
|
||||
agentContainer.WithImage(podImage)
|
||||
|
||||
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
|
||||
nodeSelectorRequirement.WithKey("mizu-non-existing-label")
|
||||
nodeSelectorRequirement.WithOperator(core.NodeSelectorOpExists)
|
||||
nodeSelectorTerm := applyconfcore.NodeSelectorTerm()
|
||||
nodeSelectorTerm.WithMatchExpressions(nodeSelectorRequirement)
|
||||
nodeSelector := applyconfcore.NodeSelector()
|
||||
nodeSelector.WithNodeSelectorTerms(nodeSelectorTerm)
|
||||
nodeAffinity := applyconfcore.NodeAffinity()
|
||||
nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector)
|
||||
affinity := applyconfcore.Affinity()
|
||||
affinity.WithNodeAffinity(nodeAffinity)
|
||||
|
||||
podSpec := applyconfcore.PodSpec()
|
||||
podSpec.WithContainers(agentContainer)
|
||||
podSpec.WithAffinity(affinity)
|
||||
|
||||
podTemplate := applyconfcore.PodTemplateSpec()
|
||||
podTemplate.WithLabels(map[string]string{
|
||||
"app": tapperPodName,
|
||||
LabelManagedBy: provider.managedBy,
|
||||
LabelCreatedBy: provider.createdBy,
|
||||
})
|
||||
podTemplate.WithSpec(podSpec)
|
||||
|
||||
labelSelector := applyconfmeta.LabelSelector()
|
||||
labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName})
|
||||
|
||||
applyOptions := metav1.ApplyOptions{
|
||||
Force: true,
|
||||
FieldManager: fieldManagerName,
|
||||
}
|
||||
|
||||
daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace)
|
||||
daemonSet.
|
||||
WithLabels(map[string]string{
|
||||
LabelManagedBy: provider.managedBy,
|
||||
LabelCreatedBy: provider.createdBy,
|
||||
}).
|
||||
WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate))
|
||||
|
||||
_, err := provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions)
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) listPodsImpl(ctx context.Context, regex *regexp.Regexp, namespaces []string, listOptions metav1.ListOptions) ([]core.Pod, error) {
|
||||
var pods []core.Pod
|
||||
for _, namespace := range namespaces {
|
||||
@@ -1038,7 +1087,7 @@ func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, r
|
||||
return matchingPods, nil
|
||||
}
|
||||
|
||||
func(provider *Provider) ListPodsByAppLabel(ctx context.Context, namespaces string, labelName string) ([]core.Pod, error) {
|
||||
func (provider *Provider) ListPodsByAppLabel(ctx context.Context, namespaces string, labelName string) ([]core.Pod, error) {
|
||||
pods, err := provider.clientSet.CoreV1().Pods(namespaces).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("app=%s", labelName)})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -100,6 +100,7 @@ type Dissector interface {
|
||||
Ping()
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
|
||||
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
|
||||
Summarize(entry *Entry) *BaseEntry
|
||||
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
|
||||
Macros() map[string]string
|
||||
NewResponseRequestMatcher() RequestResponseMatcher
|
||||
@@ -135,12 +136,7 @@ type Entry struct {
|
||||
StartTime time.Time `json:"startTime"`
|
||||
Request map[string]interface{} `json:"request"`
|
||||
Response map[string]interface{} `json:"response"`
|
||||
Summary string `json:"summary"`
|
||||
Method string `json:"method"`
|
||||
Status int `json:"status"`
|
||||
ElapsedTime int64 `json:"elapsedTime"`
|
||||
Path string `json:"path"`
|
||||
IsOutgoing bool `json:"isOutgoing,omitempty"`
|
||||
Rules ApplicableRules `json:"rules,omitempty"`
|
||||
ContractStatus ContractStatus `json:"contractStatus,omitempty"`
|
||||
ContractRequestReason string `json:"contractRequestReason,omitempty"`
|
||||
@@ -154,6 +150,7 @@ type EntryWrapper struct {
|
||||
Representation string `json:"representation"`
|
||||
BodySize int64 `json:"bodySize"`
|
||||
Data *Entry `json:"data"`
|
||||
Base *BaseEntry `json:"base"`
|
||||
Rules []map[string]interface{} `json:"rulesMatched,omitempty"`
|
||||
IsRulesEnabled bool `json:"isRulesEnabled"`
|
||||
}
|
||||
@@ -161,11 +158,12 @@ type EntryWrapper struct {
|
||||
type BaseEntry struct {
|
||||
Id uint `json:"id"`
|
||||
Protocol Protocol `json:"proto,omitempty"`
|
||||
Url string `json:"url,omitempty"`
|
||||
Path string `json:"path,omitempty"`
|
||||
Summary string `json:"summary,omitempty"`
|
||||
StatusCode int `json:"status"`
|
||||
SummaryQuery string `json:"summaryQuery,omitempty"`
|
||||
Status int `json:"status"`
|
||||
StatusQuery string `json:"statusQuery"`
|
||||
Method string `json:"method,omitempty"`
|
||||
MethodQuery string `json:"methodQuery,omitempty"`
|
||||
Timestamp int64 `json:"timestamp,omitempty"`
|
||||
Source *TCP `json:"src"`
|
||||
Destination *TCP `json:"dst"`
|
||||
@@ -190,44 +188,6 @@ type Contract struct {
|
||||
Content string `json:"content"`
|
||||
}
|
||||
|
||||
func Summarize(entry *Entry) *BaseEntry {
|
||||
return &BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Path: entry.Path,
|
||||
Summary: entry.Summary,
|
||||
StatusCode: entry.Status,
|
||||
Method: entry.Method,
|
||||
Timestamp: entry.Timestamp,
|
||||
Source: entry.Source,
|
||||
Destination: entry.Destination,
|
||||
IsOutgoing: entry.IsOutgoing,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: entry.Rules,
|
||||
ContractStatus: entry.ContractStatus,
|
||||
}
|
||||
}
|
||||
|
||||
type DataUnmarshaler interface {
|
||||
UnmarshalData(*Entry) error
|
||||
}
|
||||
|
||||
func (bed *BaseEntry) UnmarshalData(entry *Entry) error {
|
||||
bed.Protocol = entry.Protocol
|
||||
bed.Id = entry.Id
|
||||
bed.Path = entry.Path
|
||||
bed.Summary = entry.Summary
|
||||
bed.StatusCode = entry.Status
|
||||
bed.Method = entry.Method
|
||||
bed.Timestamp = entry.Timestamp
|
||||
bed.Source = entry.Source
|
||||
bed.Destination = entry.Destination
|
||||
bed.IsOutgoing = entry.IsOutgoing
|
||||
bed.Latency = entry.ElapsedTime
|
||||
bed.ContractStatus = entry.ContractStatus
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
TABLE string = "table"
|
||||
BODY string = "body"
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/amqp/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/amqp/\* expect
|
||||
|
||||
@@ -219,31 +219,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
|
||||
summary := ""
|
||||
switch request["method"] {
|
||||
case basicMethodMap[40]:
|
||||
summary = reqDetails["exchange"].(string)
|
||||
case basicMethodMap[60]:
|
||||
summary = reqDetails["exchange"].(string)
|
||||
case exchangeMethodMap[10]:
|
||||
summary = reqDetails["exchange"].(string)
|
||||
case queueMethodMap[10]:
|
||||
summary = reqDetails["queue"].(string)
|
||||
case connectionMethodMap[10]:
|
||||
summary = fmt.Sprintf(
|
||||
"%s.%s",
|
||||
strconv.Itoa(int(reqDetails["versionMajor"].(float64))),
|
||||
strconv.Itoa(int(reqDetails["versionMinor"].(float64))),
|
||||
)
|
||||
case connectionMethodMap[50]:
|
||||
summary = reqDetails["replyText"].(string)
|
||||
case queueMethodMap[20]:
|
||||
summary = reqDetails["queue"].(string)
|
||||
case basicMethodMap[20]:
|
||||
summary = reqDetails["queue"].(string)
|
||||
}
|
||||
|
||||
request["url"] = summary
|
||||
reqDetails["method"] = request["method"]
|
||||
return &api.Entry{
|
||||
Protocol: protocol,
|
||||
@@ -260,17 +235,70 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
Namespace: namespace,
|
||||
Outgoing: item.ConnectionInfo.IsOutgoing,
|
||||
Request: reqDetails,
|
||||
Method: request["method"].(string),
|
||||
Status: 0,
|
||||
Timestamp: item.Timestamp,
|
||||
StartTime: item.Pair.Request.CaptureTime,
|
||||
ElapsedTime: 0,
|
||||
Summary: summary,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
summary := ""
|
||||
summaryQuery := ""
|
||||
method := entry.Request["method"].(string)
|
||||
methodQuery := fmt.Sprintf(`request.method == "%s"`, method)
|
||||
switch method {
|
||||
case basicMethodMap[40]:
|
||||
summary = entry.Request["exchange"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
|
||||
case basicMethodMap[60]:
|
||||
summary = entry.Request["exchange"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
|
||||
case exchangeMethodMap[10]:
|
||||
summary = entry.Request["exchange"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
|
||||
case queueMethodMap[10]:
|
||||
summary = entry.Request["queue"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
|
||||
case connectionMethodMap[10]:
|
||||
versionMajor := int(entry.Request["versionMajor"].(float64))
|
||||
versionMinor := int(entry.Request["versionMinor"].(float64))
|
||||
summary = fmt.Sprintf(
|
||||
"%s.%s",
|
||||
strconv.Itoa(versionMajor),
|
||||
strconv.Itoa(versionMinor),
|
||||
)
|
||||
summaryQuery = fmt.Sprintf(`request.versionMajor == %d and request.versionMinor == %d`, versionMajor, versionMinor)
|
||||
case connectionMethodMap[50]:
|
||||
summary = entry.Request["replyText"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.replyText == "%s"`, summary)
|
||||
case queueMethodMap[20]:
|
||||
summary = entry.Request["queue"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
|
||||
case basicMethodMap[20]:
|
||||
summary = entry.Request["queue"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
|
||||
}
|
||||
|
||||
return &api.BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Summary: summary,
|
||||
SummaryQuery: summaryQuery,
|
||||
Status: 0,
|
||||
StatusQuery: "",
|
||||
Method: method,
|
||||
MethodQuery: methodQuery,
|
||||
Timestamp: entry.Timestamp,
|
||||
Source: entry.Source,
|
||||
Destination: entry.Destination,
|
||||
IsOutgoing: entry.Outgoing,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: entry.Rules,
|
||||
ContractStatus: entry.ContractStatus,
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
|
||||
bodySize = 0
|
||||
representation := make(map[string]interface{})
|
||||
|
||||
@@ -21,14 +21,16 @@ import (
|
||||
const (
|
||||
binDir = "bin"
|
||||
patternBin = "*_req.bin"
|
||||
patternDissect = "*.json"
|
||||
patternExpect = "*.json"
|
||||
msgDissecting = "Dissecting:"
|
||||
msgAnalyzing = "Analyzing:"
|
||||
msgSummarizing = "Summarizing:"
|
||||
msgRepresenting = "Representing:"
|
||||
respSuffix = "_res.bin"
|
||||
expectDir = "expect"
|
||||
dissectDir = "dissect"
|
||||
analyzeDir = "analyze"
|
||||
summarizeDir = "summarize"
|
||||
representDir = "represent"
|
||||
testUpdate = "TEST_UPDATE"
|
||||
)
|
||||
@@ -186,7 +188,7 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -230,6 +232,63 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummarize(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
expectDirAnalyze := path.Join(expectDir, analyzeDir)
|
||||
expectDirSummarize := path.Join(expectDir, summarizeDir)
|
||||
|
||||
if testUpdateEnabled {
|
||||
os.RemoveAll(expectDirSummarize)
|
||||
err := os.MkdirAll(expectDirSummarize, 0775)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for _, _path := range paths {
|
||||
fmt.Printf("%s %s\n", msgSummarizing, _path)
|
||||
|
||||
bytes, err := ioutil.ReadFile(_path)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var entries []*api.Entry
|
||||
err = json.Unmarshal(bytes, &entries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var baseEntries []*api.BaseEntry
|
||||
for _, entry := range entries {
|
||||
baseEntry := dissector.Summarize(entry)
|
||||
baseEntries = append(baseEntries, baseEntry)
|
||||
}
|
||||
|
||||
pathExpect := path.Join(expectDirSummarize, filepath.Base(_path))
|
||||
|
||||
marshaled, err := json.Marshal(baseEntries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
if testUpdateEnabled {
|
||||
if len(baseEntries) > 0 {
|
||||
err = os.WriteFile(pathExpect, marshaled, 0644)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
} else {
|
||||
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
|
||||
assert.Len(t, entries, 0)
|
||||
} else {
|
||||
expectedBytes, err := ioutil.ReadFile(pathExpect)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.JSONEq(t, string(expectedBytes), string(marshaled))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepresent(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
@@ -243,7 +302,7 @@ func TestRepresent(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect2/http/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/http/\* expect
|
||||
|
||||
@@ -231,7 +231,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
reqDetails["targetUri"] = reqDetails["url"]
|
||||
reqDetails["path"] = path
|
||||
reqDetails["pathSegments"] = strings.Split(path, "/")[1:]
|
||||
reqDetails["summary"] = path
|
||||
|
||||
// Rearrange the maps for the querying
|
||||
reqDetails["_headers"] = reqDetails["headers"]
|
||||
@@ -248,17 +247,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
reqDetails["_queryStringMerged"] = mapSliceMergeRepeatedKeys(reqDetails["_queryString"].([]interface{}))
|
||||
reqDetails["queryString"] = mapSliceRebuildAsMap(reqDetails["_queryStringMerged"].([]interface{}))
|
||||
|
||||
method := reqDetails["method"].(string)
|
||||
statusCode := int(resDetails["status"].(float64))
|
||||
if item.Protocol.Abbreviation == "gRPC" {
|
||||
resDetails["statusText"] = grpcStatusCodes[statusCode]
|
||||
}
|
||||
|
||||
if item.Protocol.Version == "2.0" && !isRequestUpgradedH2C {
|
||||
reqDetails["url"] = path
|
||||
request["url"] = path
|
||||
}
|
||||
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
if elapsedTime < 0 {
|
||||
elapsedTime = 0
|
||||
@@ -280,17 +273,40 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
Outgoing: item.ConnectionInfo.IsOutgoing,
|
||||
Request: reqDetails,
|
||||
Response: resDetails,
|
||||
Method: method,
|
||||
Status: statusCode,
|
||||
Timestamp: item.Timestamp,
|
||||
StartTime: item.Pair.Request.CaptureTime,
|
||||
ElapsedTime: elapsedTime,
|
||||
Summary: path,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
HTTPPair: string(httpPair),
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
summary := entry.Request["path"].(string)
|
||||
summaryQuery := fmt.Sprintf(`request.path == "%s"`, summary)
|
||||
method := entry.Request["method"].(string)
|
||||
methodQuery := fmt.Sprintf(`request.method == "%s"`, method)
|
||||
status := int(entry.Response["status"].(float64))
|
||||
statusQuery := fmt.Sprintf(`response.status == %d`, status)
|
||||
|
||||
return &api.BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Summary: summary,
|
||||
SummaryQuery: summaryQuery,
|
||||
Status: status,
|
||||
StatusQuery: statusQuery,
|
||||
Method: method,
|
||||
MethodQuery: methodQuery,
|
||||
Timestamp: entry.Timestamp,
|
||||
Source: entry.Source,
|
||||
Destination: entry.Destination,
|
||||
IsOutgoing: entry.Outgoing,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: entry.Rules,
|
||||
ContractStatus: entry.ContractStatus,
|
||||
}
|
||||
}
|
||||
|
||||
func representRequest(request map[string]interface{}) (repRequest []interface{}) {
|
||||
details, _ := json.Marshal([]api.TableData{
|
||||
{
|
||||
|
||||
@@ -21,14 +21,16 @@ import (
|
||||
const (
|
||||
binDir = "bin"
|
||||
patternBin = "*_req.bin"
|
||||
patternDissect = "*.json"
|
||||
patternExpect = "*.json"
|
||||
msgDissecting = "Dissecting:"
|
||||
msgAnalyzing = "Analyzing:"
|
||||
msgSummarizing = "Summarizing:"
|
||||
msgRepresenting = "Representing:"
|
||||
respSuffix = "_res.bin"
|
||||
expectDir = "expect"
|
||||
dissectDir = "dissect"
|
||||
analyzeDir = "analyze"
|
||||
summarizeDir = "summarize"
|
||||
representDir = "represent"
|
||||
testUpdate = "TEST_UPDATE"
|
||||
)
|
||||
@@ -188,7 +190,7 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -232,6 +234,63 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummarize(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
expectDirAnalyze := path.Join(expectDir, analyzeDir)
|
||||
expectDirSummarize := path.Join(expectDir, summarizeDir)
|
||||
|
||||
if testUpdateEnabled {
|
||||
os.RemoveAll(expectDirSummarize)
|
||||
err := os.MkdirAll(expectDirSummarize, 0775)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for _, _path := range paths {
|
||||
fmt.Printf("%s %s\n", msgSummarizing, _path)
|
||||
|
||||
bytes, err := ioutil.ReadFile(_path)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var entries []*api.Entry
|
||||
err = json.Unmarshal(bytes, &entries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var baseEntries []*api.BaseEntry
|
||||
for _, entry := range entries {
|
||||
baseEntry := dissector.Summarize(entry)
|
||||
baseEntries = append(baseEntries, baseEntry)
|
||||
}
|
||||
|
||||
pathExpect := path.Join(expectDirSummarize, filepath.Base(_path))
|
||||
|
||||
marshaled, err := json.Marshal(baseEntries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
if testUpdateEnabled {
|
||||
if len(baseEntries) > 0 {
|
||||
err = os.WriteFile(pathExpect, marshaled, 0644)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
} else {
|
||||
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
|
||||
assert.Len(t, entries, 0)
|
||||
} else {
|
||||
expectedBytes, err := ioutil.ReadFile(pathExpect)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.JSONEq(t, string(expectedBytes), string(marshaled))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepresent(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
@@ -245,7 +304,7 @@ func TestRepresent(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/kafka/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/kafka/\* expect
|
||||
|
||||
@@ -61,83 +61,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
apiKey := ApiKey(reqDetails["apiKey"].(float64))
|
||||
|
||||
summary := ""
|
||||
switch apiKey {
|
||||
case Metadata:
|
||||
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for _, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case ApiVersions:
|
||||
summary = reqDetails["clientID"].(string)
|
||||
case Produce:
|
||||
_topics := reqDetails["payload"].(map[string]interface{})["topicData"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for _, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case Fetch:
|
||||
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for _, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case ListOffsets:
|
||||
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for _, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case CreateTopics:
|
||||
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for _, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case DeleteTopics:
|
||||
if reqDetails["topicNames"] == nil {
|
||||
break
|
||||
}
|
||||
topicNames := reqDetails["topicNames"].([]string)
|
||||
for _, name := range topicNames {
|
||||
summary += fmt.Sprintf("%s, ", name)
|
||||
}
|
||||
}
|
||||
|
||||
request["url"] = summary
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
if elapsedTime < 0 {
|
||||
elapsedTime = 0
|
||||
@@ -158,13 +82,127 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
Outgoing: item.ConnectionInfo.IsOutgoing,
|
||||
Request: reqDetails,
|
||||
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
|
||||
Method: apiNames[apiKey],
|
||||
Status: 0,
|
||||
Timestamp: item.Timestamp,
|
||||
StartTime: item.Pair.Request.CaptureTime,
|
||||
ElapsedTime: elapsedTime,
|
||||
Summary: summary,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
status := 0
|
||||
statusQuery := ""
|
||||
|
||||
apiKey := ApiKey(entry.Request["apiKey"].(float64))
|
||||
method := apiNames[apiKey]
|
||||
methodQuery := fmt.Sprintf("request.apiKey == %d", int(entry.Request["apiKey"].(float64)))
|
||||
|
||||
summary := ""
|
||||
summaryQuery := ""
|
||||
switch apiKey {
|
||||
case Metadata:
|
||||
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for i, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].name == "%s" and`, i, summary)
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
||||
}
|
||||
case ApiVersions:
|
||||
summary = entry.Request["clientID"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.clientID == "%s"`, summary)
|
||||
case Produce:
|
||||
_topics := entry.Request["payload"].(map[string]interface{})["topicData"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for i, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
|
||||
summaryQuery += fmt.Sprintf(`request.payload.topicData[%d].topic == "%s" and`, i, summary)
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
||||
}
|
||||
case Fetch:
|
||||
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for i, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
|
||||
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].topic == "%s" and`, i, summary)
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
||||
}
|
||||
case ListOffsets:
|
||||
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for i, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].name == "%s" and`, i, summary)
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
||||
}
|
||||
case CreateTopics:
|
||||
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for i, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].name == "%s" and`, i, summary)
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
||||
}
|
||||
case DeleteTopics:
|
||||
if entry.Request["topicNames"] == nil {
|
||||
break
|
||||
}
|
||||
topicNames := entry.Request["topicNames"].([]string)
|
||||
for i, name := range topicNames {
|
||||
summary += fmt.Sprintf("%s, ", name)
|
||||
summaryQuery += fmt.Sprintf(`request.topicNames[%d] == "%s" and`, i, summary)
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
||||
}
|
||||
}
|
||||
|
||||
return &api.BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Summary: summary,
|
||||
SummaryQuery: summaryQuery,
|
||||
Status: status,
|
||||
StatusQuery: statusQuery,
|
||||
Method: method,
|
||||
MethodQuery: methodQuery,
|
||||
Timestamp: entry.Timestamp,
|
||||
Source: entry.Source,
|
||||
Destination: entry.Destination,
|
||||
IsOutgoing: entry.Outgoing,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: entry.Rules,
|
||||
ContractStatus: entry.ContractStatus,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -21,14 +21,16 @@ import (
|
||||
const (
|
||||
binDir = "bin"
|
||||
patternBin = "*_req.bin"
|
||||
patternDissect = "*.json"
|
||||
patternExpect = "*.json"
|
||||
msgDissecting = "Dissecting:"
|
||||
msgAnalyzing = "Analyzing:"
|
||||
msgSummarizing = "Summarizing:"
|
||||
msgRepresenting = "Representing:"
|
||||
respSuffix = "_res.bin"
|
||||
expectDir = "expect"
|
||||
dissectDir = "dissect"
|
||||
analyzeDir = "analyze"
|
||||
summarizeDir = "summarize"
|
||||
representDir = "represent"
|
||||
testUpdate = "TEST_UPDATE"
|
||||
)
|
||||
@@ -187,7 +189,7 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -231,6 +233,63 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummarize(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
expectDirAnalyze := path.Join(expectDir, analyzeDir)
|
||||
expectDirSummarize := path.Join(expectDir, summarizeDir)
|
||||
|
||||
if testUpdateEnabled {
|
||||
os.RemoveAll(expectDirSummarize)
|
||||
err := os.MkdirAll(expectDirSummarize, 0775)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for _, _path := range paths {
|
||||
fmt.Printf("%s %s\n", msgSummarizing, _path)
|
||||
|
||||
bytes, err := ioutil.ReadFile(_path)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var entries []*api.Entry
|
||||
err = json.Unmarshal(bytes, &entries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var baseEntries []*api.BaseEntry
|
||||
for _, entry := range entries {
|
||||
baseEntry := dissector.Summarize(entry)
|
||||
baseEntries = append(baseEntries, baseEntry)
|
||||
}
|
||||
|
||||
pathExpect := path.Join(expectDirSummarize, filepath.Base(_path))
|
||||
|
||||
marshaled, err := json.Marshal(baseEntries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
if testUpdateEnabled {
|
||||
if len(baseEntries) > 0 {
|
||||
err = os.WriteFile(pathExpect, marshaled, 0644)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
} else {
|
||||
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
|
||||
assert.Len(t, entries, 0)
|
||||
} else {
|
||||
expectedBytes, err := ioutil.ReadFile(pathExpect)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.JSONEq(t, string(expectedBytes), string(marshaled))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepresent(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
@@ -244,7 +303,7 @@ func TestRepresent(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/redis/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/redis/\* expect
|
||||
|
||||
@@ -65,17 +65,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
resDetails := response["details"].(map[string]interface{})
|
||||
|
||||
method := ""
|
||||
if reqDetails["command"] != nil {
|
||||
method = reqDetails["command"].(string)
|
||||
}
|
||||
|
||||
summary := ""
|
||||
if reqDetails["key"] != nil {
|
||||
summary = reqDetails["key"].(string)
|
||||
}
|
||||
|
||||
request["url"] = summary
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
if elapsedTime < 0 {
|
||||
elapsedTime = 0
|
||||
@@ -96,17 +85,50 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
Outgoing: item.ConnectionInfo.IsOutgoing,
|
||||
Request: reqDetails,
|
||||
Response: resDetails,
|
||||
Method: method,
|
||||
Status: 0,
|
||||
Timestamp: item.Timestamp,
|
||||
StartTime: item.Pair.Request.CaptureTime,
|
||||
ElapsedTime: elapsedTime,
|
||||
Summary: summary,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
status := 0
|
||||
statusQuery := ""
|
||||
|
||||
method := ""
|
||||
methodQuery := ""
|
||||
if entry.Request["command"] != nil {
|
||||
method = entry.Request["command"].(string)
|
||||
methodQuery = fmt.Sprintf(`request.command == "%s"`, method)
|
||||
}
|
||||
|
||||
summary := ""
|
||||
summaryQuery := ""
|
||||
if entry.Request["key"] != nil {
|
||||
summary = entry.Request["key"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.key == "%s"`, summary)
|
||||
}
|
||||
|
||||
return &api.BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Summary: summary,
|
||||
SummaryQuery: summaryQuery,
|
||||
Status: status,
|
||||
StatusQuery: statusQuery,
|
||||
Method: method,
|
||||
MethodQuery: methodQuery,
|
||||
Timestamp: entry.Timestamp,
|
||||
Source: entry.Source,
|
||||
Destination: entry.Destination,
|
||||
IsOutgoing: entry.Outgoing,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: entry.Rules,
|
||||
ContractStatus: entry.ContractStatus,
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
|
||||
bodySize = 0
|
||||
representation := make(map[string]interface{})
|
||||
|
||||
@@ -22,14 +22,16 @@ import (
|
||||
const (
|
||||
binDir = "bin"
|
||||
patternBin = "*_req.bin"
|
||||
patternDissect = "*.json"
|
||||
patternExpect = "*.json"
|
||||
msgDissecting = "Dissecting:"
|
||||
msgAnalyzing = "Analyzing:"
|
||||
msgSummarizing = "Summarizing:"
|
||||
msgRepresenting = "Representing:"
|
||||
respSuffix = "_res.bin"
|
||||
expectDir = "expect"
|
||||
dissectDir = "dissect"
|
||||
analyzeDir = "analyze"
|
||||
summarizeDir = "summarize"
|
||||
representDir = "represent"
|
||||
testUpdate = "TEST_UPDATE"
|
||||
)
|
||||
@@ -187,7 +189,7 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -231,6 +233,63 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummarize(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
expectDirAnalyze := path.Join(expectDir, analyzeDir)
|
||||
expectDirSummarize := path.Join(expectDir, summarizeDir)
|
||||
|
||||
if testUpdateEnabled {
|
||||
os.RemoveAll(expectDirSummarize)
|
||||
err := os.MkdirAll(expectDirSummarize, 0775)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for _, _path := range paths {
|
||||
fmt.Printf("%s %s\n", msgSummarizing, _path)
|
||||
|
||||
bytes, err := ioutil.ReadFile(_path)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var entries []*api.Entry
|
||||
err = json.Unmarshal(bytes, &entries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var baseEntries []*api.BaseEntry
|
||||
for _, entry := range entries {
|
||||
baseEntry := dissector.Summarize(entry)
|
||||
baseEntries = append(baseEntries, baseEntry)
|
||||
}
|
||||
|
||||
pathExpect := path.Join(expectDirSummarize, filepath.Base(_path))
|
||||
|
||||
marshaled, err := json.Marshal(baseEntries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
if testUpdateEnabled {
|
||||
if len(baseEntries) > 0 {
|
||||
err = os.WriteFile(pathExpect, marshaled, 0644)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
} else {
|
||||
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
|
||||
assert.Len(t, entries, 0)
|
||||
} else {
|
||||
expectedBytes, err := ioutil.ReadFile(pathExpect)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.JSONEq(t, string(expectedBytes), string(marshaled))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepresent(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
@@ -244,7 +303,7 @@ func TestRepresent(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM alpine:3
|
||||
FROM alpine:3.14
|
||||
|
||||
RUN apk --no-cache update && apk --no-cache add clang llvm libbpf-dev go linux-headers
|
||||
|
||||
|
||||
@@ -132,8 +132,22 @@ func extractCgroup(lines []string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// cgroup in the /proc/<pid>/cgroup may look something like
|
||||
//
|
||||
// /system.slice/docker-<ID>.scope
|
||||
// /system.slice/containerd-<ID>.scope
|
||||
// /kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod3beae8e0_164d_4689_a087_efd902d8c2ab.slice/docker-<ID>.scope
|
||||
// /kubepods/besteffort/pod7709c1d5-447c-428f-bed9-8ddec35c93f4/<ID>
|
||||
//
|
||||
// This function extract the <ID> out of the cgroup path, the <ID> should match
|
||||
// the "Container ID:" field when running kubectl describe pod <POD>
|
||||
//
|
||||
func normalizeCgroup(cgrouppath string) string {
|
||||
basename := strings.TrimSpace(path.Base(cgrouppath))
|
||||
|
||||
if strings.Contains(basename, "-") {
|
||||
basename = basename[strings.Index(basename, "-") + 1:]
|
||||
}
|
||||
|
||||
if strings.Contains(basename, ".") {
|
||||
return strings.TrimSuffix(basename, filepath.Ext(basename))
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
// Code generated by bpf2go; DO NOT EDIT.
|
||||
//go:build arm64be || armbe || mips || mips64 || mips64p32 || ppc64 || s390 || s390x || sparc || sparc64
|
||||
// +build arm64be armbe mips mips64 mips64p32 ppc64 s390 s390x sparc sparc64
|
||||
|
||||
package tlstapper
|
||||
|
||||
Binary file not shown.
@@ -1,5 +1,4 @@
|
||||
// Code generated by bpf2go; DO NOT EDIT.
|
||||
//go:build 386 || amd64 || amd64p32 || arm || arm64 || mips64le || mips64p32le || mipsle || ppc64le || riscv64
|
||||
// +build 386 amd64 amd64p32 arm arm64 mips64le mips64p32le mipsle ppc64le riscv64
|
||||
|
||||
package tlstapper
|
||||
|
||||
Binary file not shown.
@@ -119,7 +119,7 @@ export const EntryDetailed = () => {
|
||||
bodySize={entryData.bodySize}
|
||||
elapsedTime={entryData.data.elapsedTime}
|
||||
/>}
|
||||
{entryData && <EntrySummary entry={entryData.data}/>}
|
||||
{entryData && <EntrySummary entry={entryData.base}/>}
|
||||
<>
|
||||
{entryData && <EntryViewer
|
||||
representation={entryData.representation}
|
||||
|
||||
@@ -25,9 +25,12 @@ interface TCPInterface {
|
||||
interface Entry {
|
||||
proto: ProtocolInterface,
|
||||
method?: string,
|
||||
methodQuery?: string,
|
||||
summary: string,
|
||||
summaryQuery: string,
|
||||
id: number,
|
||||
status?: number;
|
||||
statusQuery?: string;
|
||||
timestamp: Date;
|
||||
src: TCPInterface,
|
||||
dst: TCPInterface,
|
||||
@@ -152,10 +155,10 @@ export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) =>
|
||||
horizontal={false}
|
||||
/> : null}
|
||||
{isStatusCodeEnabled && <div>
|
||||
<StatusCode statusCode={entry.status}/>
|
||||
<StatusCode statusCode={entry.status} statusQuery={entry.statusQuery}/>
|
||||
</div>}
|
||||
<div className={styles.endpointServiceContainer} style={{paddingLeft: endpointServiceContainer}}>
|
||||
<Summary method={entry.method} summary={entry.summary}/>
|
||||
<Summary method={entry.method} methodQuery={entry.methodQuery} summary={entry.summary} summaryQuery={entry.summaryQuery}/>
|
||||
<div className={styles.resolvedName}>
|
||||
<Queryable
|
||||
query={`src.name == "${entry.src.name}"`}
|
||||
|
||||
@@ -10,14 +10,15 @@ export enum StatusCodeClassification {
|
||||
|
||||
interface EntryProps {
|
||||
statusCode: number
|
||||
statusQuery: string
|
||||
}
|
||||
|
||||
const StatusCode: React.FC<EntryProps> = ({statusCode}) => {
|
||||
const StatusCode: React.FC<EntryProps> = ({statusCode, statusQuery}) => {
|
||||
|
||||
const classification = getClassification(statusCode)
|
||||
|
||||
return <Queryable
|
||||
query={`response.status == ${statusCode}`}
|
||||
query={statusQuery}
|
||||
displayIconOnMouseOver={true}
|
||||
flipped={true}
|
||||
iconStyle={{marginTop: "40px", paddingLeft: "10px"}}
|
||||
|
||||
@@ -5,14 +5,16 @@ import Queryable from "./Queryable";
|
||||
|
||||
interface SummaryProps {
|
||||
method: string
|
||||
methodQuery: string
|
||||
summary: string
|
||||
summaryQuery: string
|
||||
}
|
||||
|
||||
export const Summary: React.FC<SummaryProps> = ({method, summary}) => {
|
||||
export const Summary: React.FC<SummaryProps> = ({method, methodQuery, summary, summaryQuery}) => {
|
||||
|
||||
return <div className={styles.container}>
|
||||
{method && <Queryable
|
||||
query={`method == "${method}"`}
|
||||
query={methodQuery}
|
||||
className={`${miscStyles.protocol} ${miscStyles.method}`}
|
||||
displayIconOnMouseOver={true}
|
||||
style={{whiteSpace: "nowrap"}}
|
||||
@@ -24,7 +26,7 @@ export const Summary: React.FC<SummaryProps> = ({method, summary}) => {
|
||||
</span>
|
||||
</Queryable>}
|
||||
{summary && <Queryable
|
||||
query={`summary == "${summary}"`}
|
||||
query={summaryQuery}
|
||||
displayIconOnMouseOver={true}
|
||||
flipped={true}
|
||||
iconStyle={{zIndex:"5",position:"relative",right:"14px"}}
|
||||
|
||||
Reference in New Issue
Block a user